1use std::collections::BTreeMap;
8use std::fmt;
9use std::fs::{File, OpenOptions};
10use std::io::{self, Write};
11use std::path::PathBuf;
12use std::sync::{Mutex, OnceLock};
13
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16
17pub const PROFILE_JSONL_ENV: &str = "FERRUM_PROFILE_JSONL";
18pub const PROFILE_COMMIT_SHA_ENV: &str = "FERRUM_PROFILE_COMMIT_SHA";
19pub const PROFILE_ENV_HASH_ENV: &str = "FERRUM_PROFILE_ENV_HASH";
20pub const PROFILE_MODEL_ENV: &str = "FERRUM_PROFILE_MODEL";
21pub const PROFILE_CONCURRENCY_ENV: &str = "FERRUM_PROFILE_CONCURRENCY";
22pub const PROFILE_RUNTIME_FLAGS_JSON_ENV: &str = "FERRUM_PROFILE_RUNTIME_FLAGS_JSON";
23
24static GLOBAL_PROFILE: OnceLock<ProfileJsonlWriter> = OnceLock::new();
25
26pub fn global_profile() -> &'static ProfileJsonlWriter {
29 GLOBAL_PROFILE.get_or_init(ProfileJsonlWriter::from_env)
30}
31
32pub fn configure_global_profile(config: ProfileSinkConfig) -> io::Result<bool> {
36 let writer = ProfileJsonlWriter::from_config(config)?;
37 Ok(GLOBAL_PROFILE.set(writer).is_ok())
38}
39
40pub fn flush_global_profile() {
41 if let Some(writer) = GLOBAL_PROFILE.get() {
42 let _ = writer.flush();
43 }
44}
45
46pub const REQUIRED_PROFILE_EVENT_FIELDS: &[&str] = &[
48 "event",
49 "commit_sha",
50 "env_hash",
51 "model",
52 "concurrency",
53 "shape",
54 "stage_us",
55 "graph_enabled",
56 "runtime_flags",
57];
58
59#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
63pub struct ProfileEvent {
64 pub event: String,
67 pub commit_sha: Option<String>,
70 pub env_hash: String,
72 pub model: String,
74 pub concurrency: u32,
76 pub shape: BTreeMap<String, Value>,
80 pub stage_us: BTreeMap<String, Value>,
82 pub graph_enabled: bool,
84 pub runtime_flags: Value,
86 #[serde(default, skip_serializing_if = "Option::is_none")]
87 pub source: Option<String>,
88 #[serde(default, skip_serializing_if = "Option::is_none")]
89 pub source_line: Option<String>,
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93pub struct ProfileMetadata {
94 pub commit_sha: Option<String>,
95 pub env_hash: String,
96 pub model: String,
97 pub concurrency: u32,
98 pub runtime_flags: Value,
99}
100
101impl Default for ProfileMetadata {
102 fn default() -> Self {
103 Self {
104 commit_sha: None,
105 env_hash: "sha256:unknown".to_string(),
106 model: "unknown".to_string(),
107 concurrency: 1,
108 runtime_flags: Value::Object(serde_json::Map::new()),
109 }
110 }
111}
112
113impl ProfileMetadata {
114 pub fn from_env() -> Self {
115 Self::from_env_vars(std::env::vars())
116 }
117
118 pub fn from_env_vars<I, K, V>(vars: I) -> Self
119 where
120 I: IntoIterator<Item = (K, V)>,
121 K: Into<String>,
122 V: Into<String>,
123 {
124 let vars = env_vars_map(vars);
125 Self::from_env_map(&vars)
126 }
127
128 fn from_env_map(vars: &BTreeMap<String, String>) -> Self {
129 let commit_sha = vars
130 .get(PROFILE_COMMIT_SHA_ENV)
131 .filter(|value| !value.trim().is_empty());
132 let env_hash = vars
133 .get(PROFILE_ENV_HASH_ENV)
134 .filter(|value| value.starts_with("sha256:"))
135 .cloned()
136 .unwrap_or_else(|| "sha256:unknown".to_string());
137 let model = vars
138 .get(PROFILE_MODEL_ENV)
139 .filter(|value| !value.trim().is_empty())
140 .cloned()
141 .unwrap_or_else(|| "unknown".to_string());
142 let concurrency = vars
143 .get(PROFILE_CONCURRENCY_ENV)
144 .and_then(|value| value.parse::<u32>().ok())
145 .filter(|value| *value > 0)
146 .unwrap_or(1);
147 let runtime_flags = vars
148 .get(PROFILE_RUNTIME_FLAGS_JSON_ENV)
149 .and_then(|value| serde_json::from_str::<Value>(&value).ok())
150 .filter(Value::is_object)
151 .unwrap_or_else(|| Value::Object(serde_json::Map::new()));
152
153 Self {
154 commit_sha: commit_sha.cloned(),
155 env_hash,
156 model,
157 concurrency,
158 runtime_flags,
159 }
160 }
161}
162
163#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
164pub struct ProfileSinkConfig {
165 #[serde(default, skip_serializing_if = "Option::is_none")]
166 pub jsonl_path: Option<PathBuf>,
167 #[serde(default)]
168 pub metadata: ProfileMetadata,
169}
170
171impl ProfileSinkConfig {
172 pub fn disabled() -> Self {
173 Self {
174 jsonl_path: None,
175 metadata: ProfileMetadata::default(),
176 }
177 }
178
179 pub fn enabled(jsonl_path: PathBuf, metadata: ProfileMetadata) -> Self {
180 Self {
181 jsonl_path: Some(jsonl_path),
182 metadata,
183 }
184 }
185
186 pub fn from_env() -> Self {
187 Self::from_env_vars(std::env::vars())
188 }
189
190 pub fn from_env_vars<I, K, V>(vars: I) -> Self
191 where
192 I: IntoIterator<Item = (K, V)>,
193 K: Into<String>,
194 V: Into<String>,
195 {
196 let vars = env_vars_map(vars);
197 match vars.get(PROFILE_JSONL_ENV) {
198 Some(path) if !path.trim().is_empty() => {
199 Self::enabled(PathBuf::from(path), ProfileMetadata::from_env_map(&vars))
200 }
201 _ => Self::disabled(),
202 }
203 }
204}
205
206fn env_vars_map<I, K, V>(vars: I) -> BTreeMap<String, String>
207where
208 I: IntoIterator<Item = (K, V)>,
209 K: Into<String>,
210 V: Into<String>,
211{
212 vars.into_iter()
213 .map(|(key, value)| (key.into(), value.into()))
214 .collect()
215}
216
217impl ProfileEvent {
218 pub fn validate(&self) -> Result<(), ProfileValidationError> {
219 if self.event.trim().is_empty() {
220 return Err(ProfileValidationError::new("event must be non-empty"));
221 }
222 if let Some(commit_sha) = &self.commit_sha {
223 if commit_sha.trim().is_empty() {
224 return Err(ProfileValidationError::new(
225 "commit_sha must be non-empty when present",
226 ));
227 }
228 }
229 if !self.env_hash.starts_with("sha256:") {
230 return Err(ProfileValidationError::new(
231 "env_hash must start with sha256:",
232 ));
233 }
234 if self.model.trim().is_empty() {
235 return Err(ProfileValidationError::new("model must be non-empty"));
236 }
237 if self.concurrency == 0 {
238 return Err(ProfileValidationError::new("concurrency must be > 0"));
239 }
240 if !self.runtime_flags.is_object() {
241 return Err(ProfileValidationError::new(
242 "runtime_flags must be an object",
243 ));
244 }
245 Ok(())
246 }
247}
248
249pub struct ProfileJsonlWriter {
250 inner: Mutex<ProfileJsonlWriterInner>,
251 metadata: ProfileMetadata,
252}
253
254enum ProfileJsonlWriterInner {
255 Disabled,
256 File { path: PathBuf, file: File },
257}
258
259impl ProfileJsonlWriter {
260 pub fn from_env() -> Self {
261 let config = ProfileSinkConfig::from_env();
262 match Self::from_config(config) {
263 Ok(writer) => writer,
264 Err(err) => {
265 eprintln!("[profile-jsonl] failed to open configured sink: {err}");
266 Self::disabled()
267 }
268 }
269 }
270
271 pub fn from_config(config: ProfileSinkConfig) -> io::Result<Self> {
272 match config.jsonl_path {
273 Some(path) => Self::enabled(path, config.metadata),
274 None => Ok(Self::disabled()),
275 }
276 }
277
278 pub fn enabled(path: PathBuf, metadata: ProfileMetadata) -> io::Result<Self> {
279 if let Some(parent) = path.parent() {
280 std::fs::create_dir_all(parent)?;
281 }
282 let file = OpenOptions::new().create(true).append(true).open(&path)?;
283 Ok(Self {
284 inner: Mutex::new(ProfileJsonlWriterInner::File { path, file }),
285 metadata,
286 })
287 }
288
289 pub fn disabled() -> Self {
290 Self {
291 inner: Mutex::new(ProfileJsonlWriterInner::Disabled),
292 metadata: ProfileMetadata::default(),
293 }
294 }
295
296 pub fn is_enabled(&self) -> bool {
297 matches!(
298 *self.inner.lock().unwrap(),
299 ProfileJsonlWriterInner::File { .. }
300 )
301 }
302
303 pub fn push_event(
304 &self,
305 event: impl Into<String>,
306 shape: BTreeMap<String, Value>,
307 stage_us: BTreeMap<String, Value>,
308 graph_enabled: bool,
309 ) -> io::Result<()> {
310 if !self.is_enabled() {
311 return Ok(());
312 }
313 let event = ProfileEvent {
314 event: event.into(),
315 commit_sha: self.metadata.commit_sha.clone(),
316 env_hash: self.metadata.env_hash.clone(),
317 model: self.metadata.model.clone(),
318 concurrency: self.metadata.concurrency,
319 shape,
320 stage_us,
321 graph_enabled,
322 runtime_flags: self.metadata.runtime_flags.clone(),
323 source: Some("native".to_string()),
324 source_line: None,
325 };
326 event
327 .validate()
328 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
329
330 let mut inner = self.inner.lock().unwrap();
331 if let ProfileJsonlWriterInner::File { file, .. } = &mut *inner {
332 serde_json::to_writer(&mut *file, &event)?;
333 file.write_all(b"\n")?;
334 file.flush()?;
335 }
336 Ok(())
337 }
338
339 pub fn flush(&self) -> io::Result<()> {
340 let mut inner = self.inner.lock().unwrap();
341 if let ProfileJsonlWriterInner::File { file, .. } = &mut *inner {
342 file.flush()?;
343 }
344 Ok(())
345 }
346
347 pub fn path(&self) -> Option<PathBuf> {
348 let inner = self.inner.lock().unwrap();
349 match &*inner {
350 ProfileJsonlWriterInner::Disabled => None,
351 ProfileJsonlWriterInner::File { path, .. } => Some(path.clone()),
352 }
353 }
354}
355
356pub fn profile_fields_from_json(value: Value) -> BTreeMap<String, Value> {
357 match value {
358 Value::Object(map) => map.into_iter().collect(),
359 _ => BTreeMap::new(),
360 }
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
364pub struct ProfileValidationError {
365 message: String,
366}
367
368impl ProfileValidationError {
369 pub fn new(message: impl Into<String>) -> Self {
370 Self {
371 message: message.into(),
372 }
373 }
374
375 pub fn line(line_no: usize, message: impl Into<String>) -> Self {
376 Self {
377 message: format!("line {line_no}: {}", message.into()),
378 }
379 }
380
381 pub fn message(&self) -> &str {
382 &self.message
383 }
384}
385
386impl fmt::Display for ProfileValidationError {
387 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
388 f.write_str(&self.message)
389 }
390}
391
392impl std::error::Error for ProfileValidationError {}
393
394pub fn parse_profile_event_value(value: Value) -> Result<ProfileEvent, ProfileValidationError> {
399 let object = value
400 .as_object()
401 .ok_or_else(|| ProfileValidationError::new("profile event must be a JSON object"))?;
402 for key in REQUIRED_PROFILE_EVENT_FIELDS {
403 if !object.contains_key(*key) {
404 return Err(ProfileValidationError::new(format!(
405 "missing required field: {key}"
406 )));
407 }
408 }
409 let event: ProfileEvent = serde_json::from_value(value)
410 .map_err(|err| ProfileValidationError::new(format!("invalid profile event: {err}")))?;
411 event.validate()?;
412 Ok(event)
413}
414
415pub fn parse_profile_jsonl_str(input: &str) -> Result<Vec<ProfileEvent>, ProfileValidationError> {
417 let mut events = Vec::new();
418 for (line_idx, line) in input.lines().enumerate() {
419 let line_no = line_idx + 1;
420 let line = line.trim();
421 if line.is_empty() {
422 continue;
423 }
424 let value: Value = serde_json::from_str(line)
425 .map_err(|err| ProfileValidationError::line(line_no, format!("invalid JSON: {err}")))?;
426 let event = parse_profile_event_value(value)
427 .map_err(|err| ProfileValidationError::line(line_no, err.to_string()))?;
428 events.push(event);
429 }
430 Ok(events)
431}
432
433pub fn require_profile_event_groups(
435 events: &[ProfileEvent],
436 required: &[&str],
437) -> Result<(), ProfileValidationError> {
438 let mut missing = Vec::new();
439 for required_event in required {
440 if !events.iter().any(|event| event.event == *required_event) {
441 missing.push(*required_event);
442 }
443 }
444 if missing.is_empty() {
445 Ok(())
446 } else {
447 Err(ProfileValidationError::new(format!(
448 "missing profile event groups: {}",
449 missing.join(", ")
450 )))
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457
458 fn valid_event_value() -> Value {
459 serde_json::json!({
460 "event": "unified_prof",
461 "commit_sha": "abc123",
462 "env_hash": "sha256:env",
463 "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
464 "concurrency": 32,
465 "shape": {"batch": 32, "active_blocks": 65},
466 "stage_us": {"model": 12500.0, "decode_post": 350.0},
467 "graph_enabled": true,
468 "runtime_flags": {
469 "schema_version": 1,
470 "entries": []
471 },
472 "source": "server_log",
473 "source_line": "[unified-prof] model=12500 decode_post=350"
474 })
475 }
476
477 #[test]
478 fn profile_event_round_trips_and_validates() {
479 let event = parse_profile_event_value(valid_event_value()).expect("valid event");
480 event.validate().expect("valid schema");
481 assert_eq!(event.event, "unified_prof");
482 assert_eq!(event.concurrency, 32);
483 assert_eq!(
484 event.shape.get("active_blocks").and_then(Value::as_i64),
485 Some(65)
486 );
487
488 let encoded = serde_json::to_string(&event).expect("serialize");
489 let decoded = parse_profile_event_value(serde_json::from_str(&encoded).unwrap())
490 .expect("decode serialized event");
491 assert_eq!(decoded, event);
492 }
493
494 #[test]
495 fn profile_event_rejects_every_missing_required_field() {
496 for key in REQUIRED_PROFILE_EVENT_FIELDS {
497 let mut event = valid_event_value();
498 event.as_object_mut().unwrap().remove(*key);
499 let err = parse_profile_event_value(event).unwrap_err();
500 assert!(
501 err.message()
502 .contains(&format!("missing required field: {key}")),
503 "unexpected error for missing {key}: {}",
504 err.message()
505 );
506 }
507 }
508
509 #[test]
510 fn profile_event_accepts_null_commit_sha_when_key_is_present() {
511 let mut event = valid_event_value();
512 event["commit_sha"] = Value::Null;
513 let parsed = parse_profile_event_value(event).expect("null commit_sha is allowed");
514 assert_eq!(parsed.commit_sha, None);
515 }
516
517 #[test]
518 fn profile_event_rejects_bad_env_hash_and_zero_concurrency() {
519 let mut bad_hash = valid_event_value();
520 bad_hash["env_hash"] = Value::String("env".to_string());
521 assert!(parse_profile_event_value(bad_hash)
522 .unwrap_err()
523 .message()
524 .contains("env_hash"));
525
526 let mut zero_concurrency = valid_event_value();
527 zero_concurrency["concurrency"] = Value::Number(0.into());
528 assert!(parse_profile_event_value(zero_concurrency)
529 .unwrap_err()
530 .message()
531 .contains("concurrency"));
532 }
533
534 #[test]
535 fn profile_jsonl_parses_multiple_events() {
536 let one = serde_json::to_string(&valid_event_value()).unwrap();
537 let mut two_value = valid_event_value();
538 two_value["event"] = Value::String("bucket_prof".to_string());
539 let two = serde_json::to_string(&two_value).unwrap();
540
541 let events = parse_profile_jsonl_str(&format!("{one}\n\n{two}\n")).unwrap();
542 assert_eq!(events.len(), 2);
543 require_profile_event_groups(&events, &["unified_prof", "bucket_prof"]).unwrap();
544 }
545
546 #[test]
547 fn required_event_groups_reject_missing_groups() {
548 let event = parse_profile_event_value(valid_event_value()).unwrap();
549 let err = require_profile_event_groups(&[event], &["unified_prof", "bucket_prof"])
550 .expect_err("missing bucket profile");
551 assert!(err.message().contains("bucket_prof"));
552 }
553
554 #[test]
555 fn profile_parser_covers_three_fixture_artifact_shapes() {
556 let default_graph_on = [
557 serde_json::json!({
558 "event": "graph_prof",
559 "commit_sha": "abc123",
560 "env_hash": "sha256:graph",
561 "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
562 "concurrency": 32,
563 "shape": {"call": 64},
564 "stage_us": {"upload": 110, "launch": 240, "sync": 0, "total": 350},
565 "graph_enabled": true,
566 "runtime_flags": {"preset": "m3_qwen3_30b_a3b_int4"}
567 }),
568 serde_json::json!({
569 "event": "unified_prof",
570 "commit_sha": "abc123",
571 "env_hash": "sha256:graph",
572 "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
573 "concurrency": 32,
574 "shape": {"items": 32, "decode": 32},
575 "stage_us": {"total": 14900, "model": 14000, "decode_post": 600},
576 "graph_enabled": true,
577 "runtime_flags": {"preset": "m3_qwen3_30b_a3b_int4"}
578 }),
579 ];
580 let graph_off_route_dump = [
581 serde_json::json!({
582 "event": "moe_dump",
583 "commit_sha": "abc123",
584 "env_hash": "sha256:route",
585 "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
586 "concurrency": 32,
587 "shape": {"batch_x_topk": 256, "active_blocks": 65, "unique_experts": 61},
588 "stage_us": {},
589 "graph_enabled": false,
590 "runtime_flags": {"FERRUM_MOE_GRAPH": "0"}
591 }),
592 serde_json::json!({
593 "event": "vllm_moe_config",
594 "commit_sha": "abc123",
595 "env_hash": "sha256:route",
596 "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
597 "concurrency": 32,
598 "shape": {"batch_x_topk": 256, "prob_m": 32, "thread_k": 64, "thread_n": 128},
599 "stage_us": {},
600 "graph_enabled": false,
601 "runtime_flags": {"FERRUM_MOE_GRAPH": "0"}
602 }),
603 serde_json::json!({
604 "event": "bucket_prof",
605 "commit_sha": "abc123",
606 "env_hash": "sha256:route",
607 "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
608 "concurrency": 32,
609 "shape": {"layers": 48},
610 "stage_us": {"gemm1": 5860, "gemm3": 2890, "combine": 250},
611 "graph_enabled": false,
612 "runtime_flags": {"FERRUM_MOE_PROFILE": "1"}
613 }),
614 ];
615 let fa_layout_attention_ab = [
616 serde_json::json!({
617 "event": "unified_layer_prof",
618 "commit_sha": "abc123",
619 "env_hash": "sha256:fa",
620 "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
621 "concurrency": 32,
622 "shape": {"m": 128, "seqs": 4, "sampled": 4},
623 "stage_us": {"attn": 46000, "moe": 36000, "layer_sum": 91000},
624 "graph_enabled": false,
625 "runtime_flags": {"FERRUM_FA_LAYOUT_VARLEN": "1"}
626 }),
627 serde_json::json!({
628 "event": "unified_prof",
629 "commit_sha": "abc123",
630 "env_hash": "sha256:fa",
631 "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
632 "concurrency": 32,
633 "shape": {"items": 32, "prefill": 4, "decode": 28},
634 "stage_us": {"total": 61000, "model": 59000, "decode_post": 350},
635 "graph_enabled": false,
636 "runtime_flags": {"FERRUM_FA_LAYOUT_VARLEN": "1"}
637 }),
638 ];
639
640 for (fixture, required) in [
641 (&default_graph_on[..], &["graph_prof", "unified_prof"][..]),
642 (
643 &graph_off_route_dump[..],
644 &["moe_dump", "vllm_moe_config", "bucket_prof"][..],
645 ),
646 (
647 &fa_layout_attention_ab[..],
648 &["unified_layer_prof", "unified_prof"][..],
649 ),
650 ] {
651 let jsonl = fixture
652 .iter()
653 .map(|value| serde_json::to_string(value).unwrap())
654 .collect::<Vec<_>>()
655 .join("\n");
656 let events = parse_profile_jsonl_str(&jsonl).unwrap();
657 require_profile_event_groups(&events, required).unwrap();
658 }
659 }
660
661 #[test]
662 fn disabled_profile_writer_is_noop() {
663 let writer = ProfileJsonlWriter::disabled();
664 assert!(!writer.is_enabled());
665 writer
666 .push_event(
667 "iter_prof",
668 profile_fields_from_json(serde_json::json!({"batch_size": 1})),
669 profile_fields_from_json(serde_json::json!({"total": 10})),
670 false,
671 )
672 .unwrap();
673 assert_eq!(writer.path(), None);
674 }
675
676 #[test]
677 fn enabled_profile_writer_writes_valid_jsonl() {
678 let dir = tempdir();
679 let path = dir.join("profile.jsonl");
680 let writer = ProfileJsonlWriter::enabled(
681 path.clone(),
682 ProfileMetadata {
683 commit_sha: Some("abc123".to_string()),
684 env_hash: "sha256:env".to_string(),
685 model: "model".to_string(),
686 concurrency: 32,
687 runtime_flags: serde_json::json!({"schema_version": 1}),
688 },
689 )
690 .unwrap();
691
692 writer
693 .push_event(
694 "bucket_prof",
695 profile_fields_from_json(serde_json::json!({"layers": 48})),
696 profile_fields_from_json(serde_json::json!({"gemm1": 1200, "gemm3": 800})),
697 true,
698 )
699 .unwrap();
700 writer.flush().unwrap();
701
702 let jsonl = std::fs::read_to_string(&path).unwrap();
703 let events = parse_profile_jsonl_str(&jsonl).unwrap();
704 assert_eq!(events.len(), 1);
705 assert_eq!(events[0].event, "bucket_prof");
706 assert_eq!(events[0].concurrency, 32);
707 let _ = std::fs::remove_dir_all(&dir);
708 }
709
710 #[test]
711 fn typed_profile_sink_config_builds_writer() {
712 let dir = tempdir();
713 let path = dir.join("typed-profile.jsonl");
714 let config = ProfileSinkConfig::enabled(
715 path.clone(),
716 ProfileMetadata {
717 commit_sha: Some("def456".to_string()),
718 env_hash: "sha256:typed".to_string(),
719 model: "typed-model".to_string(),
720 concurrency: 16,
721 runtime_flags: serde_json::json!({"source": "typed"}),
722 },
723 );
724 let writer = ProfileJsonlWriter::from_config(config).unwrap();
725 assert!(writer.is_enabled());
726 assert_eq!(writer.path().as_deref(), Some(path.as_path()));
727
728 writer
729 .push_event(
730 "unified_prof",
731 profile_fields_from_json(serde_json::json!({"items": 16})),
732 profile_fields_from_json(serde_json::json!({"total": 1000})),
733 false,
734 )
735 .unwrap();
736 writer.flush().unwrap();
737
738 let jsonl = std::fs::read_to_string(&path).unwrap();
739 let events = parse_profile_jsonl_str(&jsonl).unwrap();
740 assert_eq!(events[0].commit_sha.as_deref(), Some("def456"));
741 assert_eq!(events[0].env_hash, "sha256:typed");
742 assert_eq!(events[0].model, "typed-model");
743 assert_eq!(events[0].concurrency, 16);
744 assert_eq!(events[0].runtime_flags["source"], "typed");
745 let _ = std::fs::remove_dir_all(&dir);
746 }
747
748 #[test]
749 fn profile_metadata_parses_env_snapshot() {
750 let metadata = ProfileMetadata::from_env_vars([
751 (PROFILE_COMMIT_SHA_ENV, "abc123"),
752 (PROFILE_ENV_HASH_ENV, "sha256:env"),
753 (PROFILE_MODEL_ENV, "model"),
754 (PROFILE_CONCURRENCY_ENV, "32"),
755 (PROFILE_RUNTIME_FLAGS_JSON_ENV, r#"{"fa_layout":true}"#),
756 ]);
757
758 assert_eq!(metadata.commit_sha.as_deref(), Some("abc123"));
759 assert_eq!(metadata.env_hash, "sha256:env");
760 assert_eq!(metadata.model, "model");
761 assert_eq!(metadata.concurrency, 32);
762 assert_eq!(metadata.runtime_flags["fa_layout"], true);
763 }
764
765 #[test]
766 fn profile_sink_config_parses_env_snapshot() {
767 let config = ProfileSinkConfig::from_env_vars([
768 (PROFILE_JSONL_ENV, "/tmp/profile.jsonl"),
769 (PROFILE_ENV_HASH_ENV, "sha256:env"),
770 (PROFILE_MODEL_ENV, "model"),
771 ]);
772
773 assert_eq!(
774 config.jsonl_path.as_deref(),
775 Some(std::path::Path::new("/tmp/profile.jsonl"))
776 );
777 assert_eq!(config.metadata.env_hash, "sha256:env");
778 assert_eq!(config.metadata.model, "model");
779
780 let disabled = ProfileSinkConfig::from_env_vars([(PROFILE_JSONL_ENV, "")]);
781 assert_eq!(disabled.jsonl_path, None);
782 }
783
784 fn tempdir() -> std::path::PathBuf {
785 let d = std::env::temp_dir().join(format!(
786 "ferrum-profile-test-{}",
787 std::time::SystemTime::now()
788 .duration_since(std::time::UNIX_EPOCH)
789 .unwrap()
790 .as_nanos()
791 ));
792 std::fs::create_dir_all(&d).unwrap();
793 d
794 }
795}