Skip to main content

ferrum_bench_core/
profile.rs

1//! Structured profile event schema shared by benchmark runners and consumers.
2//!
3//! This intentionally accepts log-derived transitional events as long as they
4//! carry the locked envelope fields. Native runtime emitters should use the
5//! same shape and fill `shape` / `stage_us` with typed values.
6
7use std::collections::BTreeMap;
8use std::fmt;
9use std::fs::{File, OpenOptions};
10use std::io::{self, Write};
11use std::path::PathBuf;
12use std::sync::{Mutex, OnceLock};
13
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16
17pub const PROFILE_JSONL_ENV: &str = "FERRUM_PROFILE_JSONL";
18pub const PROFILE_COMMIT_SHA_ENV: &str = "FERRUM_PROFILE_COMMIT_SHA";
19pub const PROFILE_ENV_HASH_ENV: &str = "FERRUM_PROFILE_ENV_HASH";
20pub const PROFILE_MODEL_ENV: &str = "FERRUM_PROFILE_MODEL";
21pub const PROFILE_CONCURRENCY_ENV: &str = "FERRUM_PROFILE_CONCURRENCY";
22pub const PROFILE_RUNTIME_FLAGS_JSON_ENV: &str = "FERRUM_PROFILE_RUNTIME_FLAGS_JSON";
23
24static GLOBAL_PROFILE: OnceLock<ProfileJsonlWriter> = OnceLock::new();
25
26/// Global structured profile writer, lazily configured from
27/// `FERRUM_PROFILE_JSONL`.
28pub fn global_profile() -> &'static ProfileJsonlWriter {
29    GLOBAL_PROFILE.get_or_init(ProfileJsonlWriter::from_env)
30}
31
32/// Configure the global profile writer from typed startup config.
33///
34/// Returns `Ok(false)` if another caller already initialized the global writer.
35pub fn configure_global_profile(config: ProfileSinkConfig) -> io::Result<bool> {
36    let writer = ProfileJsonlWriter::from_config(config)?;
37    Ok(GLOBAL_PROFILE.set(writer).is_ok())
38}
39
40pub fn flush_global_profile() {
41    if let Some(writer) = GLOBAL_PROFILE.get() {
42        let _ = writer.flush();
43    }
44}
45
46/// Required top-level fields for one JSONL profile event.
47pub const REQUIRED_PROFILE_EVENT_FIELDS: &[&str] = &[
48    "event",
49    "commit_sha",
50    "env_hash",
51    "model",
52    "concurrency",
53    "shape",
54    "stage_us",
55    "graph_enabled",
56    "runtime_flags",
57];
58
59/// Stable profile event envelope.
60///
61/// Field order is schema-significant for JSON output. Do not reorder.
62#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
63pub struct ProfileEvent {
64    /// Event family, for example `unified_prof`, `bucket_prof`, or
65    /// `vllm_moe_config`.
66    pub event: String,
67    /// Git commit for the binary under test. `None` is allowed only when the
68    /// artifact producer cannot access VCS metadata, but the key must exist.
69    pub commit_sha: Option<String>,
70    /// Canonical runtime environment hash (`sha256:...`).
71    pub env_hash: String,
72    /// HuggingFace model id or local model label.
73    pub model: String,
74    /// Closed-loop concurrency for the profiled cell.
75    pub concurrency: u32,
76    /// Shape attributes such as batch size, top-k pairs, blocks, or sequence
77    /// lengths. Values are JSON scalars so producers can extend keys without a
78    /// schema bump.
79    pub shape: BTreeMap<String, Value>,
80    /// Timings in microseconds for the relevant stage or substage.
81    pub stage_us: BTreeMap<String, Value>,
82    /// Whether the profiled path ran under CUDA graph replay.
83    pub graph_enabled: bool,
84    /// Runtime flags/config snapshot that affected this event.
85    pub runtime_flags: Value,
86    #[serde(default, skip_serializing_if = "Option::is_none")]
87    pub source: Option<String>,
88    #[serde(default, skip_serializing_if = "Option::is_none")]
89    pub source_line: Option<String>,
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93pub struct ProfileMetadata {
94    pub commit_sha: Option<String>,
95    pub env_hash: String,
96    pub model: String,
97    pub concurrency: u32,
98    pub runtime_flags: Value,
99}
100
101impl Default for ProfileMetadata {
102    fn default() -> Self {
103        Self {
104            commit_sha: None,
105            env_hash: "sha256:unknown".to_string(),
106            model: "unknown".to_string(),
107            concurrency: 1,
108            runtime_flags: Value::Object(serde_json::Map::new()),
109        }
110    }
111}
112
113impl ProfileMetadata {
114    pub fn from_env() -> Self {
115        Self::from_env_vars(std::env::vars())
116    }
117
118    pub fn from_env_vars<I, K, V>(vars: I) -> Self
119    where
120        I: IntoIterator<Item = (K, V)>,
121        K: Into<String>,
122        V: Into<String>,
123    {
124        let vars = env_vars_map(vars);
125        Self::from_env_map(&vars)
126    }
127
128    fn from_env_map(vars: &BTreeMap<String, String>) -> Self {
129        let commit_sha = vars
130            .get(PROFILE_COMMIT_SHA_ENV)
131            .filter(|value| !value.trim().is_empty());
132        let env_hash = vars
133            .get(PROFILE_ENV_HASH_ENV)
134            .filter(|value| value.starts_with("sha256:"))
135            .cloned()
136            .unwrap_or_else(|| "sha256:unknown".to_string());
137        let model = vars
138            .get(PROFILE_MODEL_ENV)
139            .filter(|value| !value.trim().is_empty())
140            .cloned()
141            .unwrap_or_else(|| "unknown".to_string());
142        let concurrency = vars
143            .get(PROFILE_CONCURRENCY_ENV)
144            .and_then(|value| value.parse::<u32>().ok())
145            .filter(|value| *value > 0)
146            .unwrap_or(1);
147        let runtime_flags = vars
148            .get(PROFILE_RUNTIME_FLAGS_JSON_ENV)
149            .and_then(|value| serde_json::from_str::<Value>(&value).ok())
150            .filter(Value::is_object)
151            .unwrap_or_else(|| Value::Object(serde_json::Map::new()));
152
153        Self {
154            commit_sha: commit_sha.cloned(),
155            env_hash,
156            model,
157            concurrency,
158            runtime_flags,
159        }
160    }
161}
162
163#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
164pub struct ProfileSinkConfig {
165    #[serde(default, skip_serializing_if = "Option::is_none")]
166    pub jsonl_path: Option<PathBuf>,
167    #[serde(default)]
168    pub metadata: ProfileMetadata,
169}
170
171impl ProfileSinkConfig {
172    pub fn disabled() -> Self {
173        Self {
174            jsonl_path: None,
175            metadata: ProfileMetadata::default(),
176        }
177    }
178
179    pub fn enabled(jsonl_path: PathBuf, metadata: ProfileMetadata) -> Self {
180        Self {
181            jsonl_path: Some(jsonl_path),
182            metadata,
183        }
184    }
185
186    pub fn from_env() -> Self {
187        Self::from_env_vars(std::env::vars())
188    }
189
190    pub fn from_env_vars<I, K, V>(vars: I) -> Self
191    where
192        I: IntoIterator<Item = (K, V)>,
193        K: Into<String>,
194        V: Into<String>,
195    {
196        let vars = env_vars_map(vars);
197        match vars.get(PROFILE_JSONL_ENV) {
198            Some(path) if !path.trim().is_empty() => {
199                Self::enabled(PathBuf::from(path), ProfileMetadata::from_env_map(&vars))
200            }
201            _ => Self::disabled(),
202        }
203    }
204}
205
206fn env_vars_map<I, K, V>(vars: I) -> BTreeMap<String, String>
207where
208    I: IntoIterator<Item = (K, V)>,
209    K: Into<String>,
210    V: Into<String>,
211{
212    vars.into_iter()
213        .map(|(key, value)| (key.into(), value.into()))
214        .collect()
215}
216
217impl ProfileEvent {
218    pub fn validate(&self) -> Result<(), ProfileValidationError> {
219        if self.event.trim().is_empty() {
220            return Err(ProfileValidationError::new("event must be non-empty"));
221        }
222        if let Some(commit_sha) = &self.commit_sha {
223            if commit_sha.trim().is_empty() {
224                return Err(ProfileValidationError::new(
225                    "commit_sha must be non-empty when present",
226                ));
227            }
228        }
229        if !self.env_hash.starts_with("sha256:") {
230            return Err(ProfileValidationError::new(
231                "env_hash must start with sha256:",
232            ));
233        }
234        if self.model.trim().is_empty() {
235            return Err(ProfileValidationError::new("model must be non-empty"));
236        }
237        if self.concurrency == 0 {
238            return Err(ProfileValidationError::new("concurrency must be > 0"));
239        }
240        if !self.runtime_flags.is_object() {
241            return Err(ProfileValidationError::new(
242                "runtime_flags must be an object",
243            ));
244        }
245        Ok(())
246    }
247}
248
249pub struct ProfileJsonlWriter {
250    inner: Mutex<ProfileJsonlWriterInner>,
251    metadata: ProfileMetadata,
252}
253
254enum ProfileJsonlWriterInner {
255    Disabled,
256    File { path: PathBuf, file: File },
257}
258
259impl ProfileJsonlWriter {
260    pub fn from_env() -> Self {
261        let config = ProfileSinkConfig::from_env();
262        match Self::from_config(config) {
263            Ok(writer) => writer,
264            Err(err) => {
265                eprintln!("[profile-jsonl] failed to open configured sink: {err}");
266                Self::disabled()
267            }
268        }
269    }
270
271    pub fn from_config(config: ProfileSinkConfig) -> io::Result<Self> {
272        match config.jsonl_path {
273            Some(path) => Self::enabled(path, config.metadata),
274            None => Ok(Self::disabled()),
275        }
276    }
277
278    pub fn enabled(path: PathBuf, metadata: ProfileMetadata) -> io::Result<Self> {
279        if let Some(parent) = path.parent() {
280            std::fs::create_dir_all(parent)?;
281        }
282        let file = OpenOptions::new().create(true).append(true).open(&path)?;
283        Ok(Self {
284            inner: Mutex::new(ProfileJsonlWriterInner::File { path, file }),
285            metadata,
286        })
287    }
288
289    pub fn disabled() -> Self {
290        Self {
291            inner: Mutex::new(ProfileJsonlWriterInner::Disabled),
292            metadata: ProfileMetadata::default(),
293        }
294    }
295
296    pub fn is_enabled(&self) -> bool {
297        matches!(
298            *self.inner.lock().unwrap(),
299            ProfileJsonlWriterInner::File { .. }
300        )
301    }
302
303    pub fn push_event(
304        &self,
305        event: impl Into<String>,
306        shape: BTreeMap<String, Value>,
307        stage_us: BTreeMap<String, Value>,
308        graph_enabled: bool,
309    ) -> io::Result<()> {
310        if !self.is_enabled() {
311            return Ok(());
312        }
313        let event = ProfileEvent {
314            event: event.into(),
315            commit_sha: self.metadata.commit_sha.clone(),
316            env_hash: self.metadata.env_hash.clone(),
317            model: self.metadata.model.clone(),
318            concurrency: self.metadata.concurrency,
319            shape,
320            stage_us,
321            graph_enabled,
322            runtime_flags: self.metadata.runtime_flags.clone(),
323            source: Some("native".to_string()),
324            source_line: None,
325        };
326        event
327            .validate()
328            .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
329
330        let mut inner = self.inner.lock().unwrap();
331        if let ProfileJsonlWriterInner::File { file, .. } = &mut *inner {
332            serde_json::to_writer(&mut *file, &event)?;
333            file.write_all(b"\n")?;
334            file.flush()?;
335        }
336        Ok(())
337    }
338
339    pub fn flush(&self) -> io::Result<()> {
340        let mut inner = self.inner.lock().unwrap();
341        if let ProfileJsonlWriterInner::File { file, .. } = &mut *inner {
342            file.flush()?;
343        }
344        Ok(())
345    }
346
347    pub fn path(&self) -> Option<PathBuf> {
348        let inner = self.inner.lock().unwrap();
349        match &*inner {
350            ProfileJsonlWriterInner::Disabled => None,
351            ProfileJsonlWriterInner::File { path, .. } => Some(path.clone()),
352        }
353    }
354}
355
356pub fn profile_fields_from_json(value: Value) -> BTreeMap<String, Value> {
357    match value {
358        Value::Object(map) => map.into_iter().collect(),
359        _ => BTreeMap::new(),
360    }
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
364pub struct ProfileValidationError {
365    message: String,
366}
367
368impl ProfileValidationError {
369    pub fn new(message: impl Into<String>) -> Self {
370        Self {
371            message: message.into(),
372        }
373    }
374
375    pub fn line(line_no: usize, message: impl Into<String>) -> Self {
376        Self {
377            message: format!("line {line_no}: {}", message.into()),
378        }
379    }
380
381    pub fn message(&self) -> &str {
382        &self.message
383    }
384}
385
386impl fmt::Display for ProfileValidationError {
387    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
388        f.write_str(&self.message)
389    }
390}
391
392impl std::error::Error for ProfileValidationError {}
393
394/// Parse and validate one JSON value as a [`ProfileEvent`].
395///
396/// This checks field presence before deserialization so optional values like
397/// `commit_sha: null` are distinct from a missing key.
398pub fn parse_profile_event_value(value: Value) -> Result<ProfileEvent, ProfileValidationError> {
399    let object = value
400        .as_object()
401        .ok_or_else(|| ProfileValidationError::new("profile event must be a JSON object"))?;
402    for key in REQUIRED_PROFILE_EVENT_FIELDS {
403        if !object.contains_key(*key) {
404            return Err(ProfileValidationError::new(format!(
405                "missing required field: {key}"
406            )));
407        }
408    }
409    let event: ProfileEvent = serde_json::from_value(value)
410        .map_err(|err| ProfileValidationError::new(format!("invalid profile event: {err}")))?;
411    event.validate()?;
412    Ok(event)
413}
414
415/// Parse a profile JSONL payload and validate every non-blank line.
416pub fn parse_profile_jsonl_str(input: &str) -> Result<Vec<ProfileEvent>, ProfileValidationError> {
417    let mut events = Vec::new();
418    for (line_idx, line) in input.lines().enumerate() {
419        let line_no = line_idx + 1;
420        let line = line.trim();
421        if line.is_empty() {
422            continue;
423        }
424        let value: Value = serde_json::from_str(line)
425            .map_err(|err| ProfileValidationError::line(line_no, format!("invalid JSON: {err}")))?;
426        let event = parse_profile_event_value(value)
427            .map_err(|err| ProfileValidationError::line(line_no, err.to_string()))?;
428        events.push(event);
429    }
430    Ok(events)
431}
432
433/// Ensure that a profile contains at least one event for every required group.
434pub fn require_profile_event_groups(
435    events: &[ProfileEvent],
436    required: &[&str],
437) -> Result<(), ProfileValidationError> {
438    let mut missing = Vec::new();
439    for required_event in required {
440        if !events.iter().any(|event| event.event == *required_event) {
441            missing.push(*required_event);
442        }
443    }
444    if missing.is_empty() {
445        Ok(())
446    } else {
447        Err(ProfileValidationError::new(format!(
448            "missing profile event groups: {}",
449            missing.join(", ")
450        )))
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457
458    fn valid_event_value() -> Value {
459        serde_json::json!({
460            "event": "unified_prof",
461            "commit_sha": "abc123",
462            "env_hash": "sha256:env",
463            "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
464            "concurrency": 32,
465            "shape": {"batch": 32, "active_blocks": 65},
466            "stage_us": {"model": 12500.0, "decode_post": 350.0},
467            "graph_enabled": true,
468            "runtime_flags": {
469                "schema_version": 1,
470                "entries": []
471            },
472            "source": "server_log",
473            "source_line": "[unified-prof] model=12500 decode_post=350"
474        })
475    }
476
477    #[test]
478    fn profile_event_round_trips_and_validates() {
479        let event = parse_profile_event_value(valid_event_value()).expect("valid event");
480        event.validate().expect("valid schema");
481        assert_eq!(event.event, "unified_prof");
482        assert_eq!(event.concurrency, 32);
483        assert_eq!(
484            event.shape.get("active_blocks").and_then(Value::as_i64),
485            Some(65)
486        );
487
488        let encoded = serde_json::to_string(&event).expect("serialize");
489        let decoded = parse_profile_event_value(serde_json::from_str(&encoded).unwrap())
490            .expect("decode serialized event");
491        assert_eq!(decoded, event);
492    }
493
494    #[test]
495    fn profile_event_rejects_every_missing_required_field() {
496        for key in REQUIRED_PROFILE_EVENT_FIELDS {
497            let mut event = valid_event_value();
498            event.as_object_mut().unwrap().remove(*key);
499            let err = parse_profile_event_value(event).unwrap_err();
500            assert!(
501                err.message()
502                    .contains(&format!("missing required field: {key}")),
503                "unexpected error for missing {key}: {}",
504                err.message()
505            );
506        }
507    }
508
509    #[test]
510    fn profile_event_accepts_null_commit_sha_when_key_is_present() {
511        let mut event = valid_event_value();
512        event["commit_sha"] = Value::Null;
513        let parsed = parse_profile_event_value(event).expect("null commit_sha is allowed");
514        assert_eq!(parsed.commit_sha, None);
515    }
516
517    #[test]
518    fn profile_event_rejects_bad_env_hash_and_zero_concurrency() {
519        let mut bad_hash = valid_event_value();
520        bad_hash["env_hash"] = Value::String("env".to_string());
521        assert!(parse_profile_event_value(bad_hash)
522            .unwrap_err()
523            .message()
524            .contains("env_hash"));
525
526        let mut zero_concurrency = valid_event_value();
527        zero_concurrency["concurrency"] = Value::Number(0.into());
528        assert!(parse_profile_event_value(zero_concurrency)
529            .unwrap_err()
530            .message()
531            .contains("concurrency"));
532    }
533
534    #[test]
535    fn profile_jsonl_parses_multiple_events() {
536        let one = serde_json::to_string(&valid_event_value()).unwrap();
537        let mut two_value = valid_event_value();
538        two_value["event"] = Value::String("bucket_prof".to_string());
539        let two = serde_json::to_string(&two_value).unwrap();
540
541        let events = parse_profile_jsonl_str(&format!("{one}\n\n{two}\n")).unwrap();
542        assert_eq!(events.len(), 2);
543        require_profile_event_groups(&events, &["unified_prof", "bucket_prof"]).unwrap();
544    }
545
546    #[test]
547    fn required_event_groups_reject_missing_groups() {
548        let event = parse_profile_event_value(valid_event_value()).unwrap();
549        let err = require_profile_event_groups(&[event], &["unified_prof", "bucket_prof"])
550            .expect_err("missing bucket profile");
551        assert!(err.message().contains("bucket_prof"));
552    }
553
554    #[test]
555    fn profile_parser_covers_three_fixture_artifact_shapes() {
556        let default_graph_on = [
557            serde_json::json!({
558                "event": "graph_prof",
559                "commit_sha": "abc123",
560                "env_hash": "sha256:graph",
561                "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
562                "concurrency": 32,
563                "shape": {"call": 64},
564                "stage_us": {"upload": 110, "launch": 240, "sync": 0, "total": 350},
565                "graph_enabled": true,
566                "runtime_flags": {"preset": "m3_qwen3_30b_a3b_int4"}
567            }),
568            serde_json::json!({
569                "event": "unified_prof",
570                "commit_sha": "abc123",
571                "env_hash": "sha256:graph",
572                "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
573                "concurrency": 32,
574                "shape": {"items": 32, "decode": 32},
575                "stage_us": {"total": 14900, "model": 14000, "decode_post": 600},
576                "graph_enabled": true,
577                "runtime_flags": {"preset": "m3_qwen3_30b_a3b_int4"}
578            }),
579        ];
580        let graph_off_route_dump = [
581            serde_json::json!({
582                "event": "moe_dump",
583                "commit_sha": "abc123",
584                "env_hash": "sha256:route",
585                "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
586                "concurrency": 32,
587                "shape": {"batch_x_topk": 256, "active_blocks": 65, "unique_experts": 61},
588                "stage_us": {},
589                "graph_enabled": false,
590                "runtime_flags": {"FERRUM_MOE_GRAPH": "0"}
591            }),
592            serde_json::json!({
593                "event": "vllm_moe_config",
594                "commit_sha": "abc123",
595                "env_hash": "sha256:route",
596                "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
597                "concurrency": 32,
598                "shape": {"batch_x_topk": 256, "prob_m": 32, "thread_k": 64, "thread_n": 128},
599                "stage_us": {},
600                "graph_enabled": false,
601                "runtime_flags": {"FERRUM_MOE_GRAPH": "0"}
602            }),
603            serde_json::json!({
604                "event": "bucket_prof",
605                "commit_sha": "abc123",
606                "env_hash": "sha256:route",
607                "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
608                "concurrency": 32,
609                "shape": {"layers": 48},
610                "stage_us": {"gemm1": 5860, "gemm3": 2890, "combine": 250},
611                "graph_enabled": false,
612                "runtime_flags": {"FERRUM_MOE_PROFILE": "1"}
613            }),
614        ];
615        let fa_layout_attention_ab = [
616            serde_json::json!({
617                "event": "unified_layer_prof",
618                "commit_sha": "abc123",
619                "env_hash": "sha256:fa",
620                "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
621                "concurrency": 32,
622                "shape": {"m": 128, "seqs": 4, "sampled": 4},
623                "stage_us": {"attn": 46000, "moe": 36000, "layer_sum": 91000},
624                "graph_enabled": false,
625                "runtime_flags": {"FERRUM_FA_LAYOUT_VARLEN": "1"}
626            }),
627            serde_json::json!({
628                "event": "unified_prof",
629                "commit_sha": "abc123",
630                "env_hash": "sha256:fa",
631                "model": "Qwen/Qwen3-30B-A3B-GPTQ-Int4",
632                "concurrency": 32,
633                "shape": {"items": 32, "prefill": 4, "decode": 28},
634                "stage_us": {"total": 61000, "model": 59000, "decode_post": 350},
635                "graph_enabled": false,
636                "runtime_flags": {"FERRUM_FA_LAYOUT_VARLEN": "1"}
637            }),
638        ];
639
640        for (fixture, required) in [
641            (&default_graph_on[..], &["graph_prof", "unified_prof"][..]),
642            (
643                &graph_off_route_dump[..],
644                &["moe_dump", "vllm_moe_config", "bucket_prof"][..],
645            ),
646            (
647                &fa_layout_attention_ab[..],
648                &["unified_layer_prof", "unified_prof"][..],
649            ),
650        ] {
651            let jsonl = fixture
652                .iter()
653                .map(|value| serde_json::to_string(value).unwrap())
654                .collect::<Vec<_>>()
655                .join("\n");
656            let events = parse_profile_jsonl_str(&jsonl).unwrap();
657            require_profile_event_groups(&events, required).unwrap();
658        }
659    }
660
661    #[test]
662    fn disabled_profile_writer_is_noop() {
663        let writer = ProfileJsonlWriter::disabled();
664        assert!(!writer.is_enabled());
665        writer
666            .push_event(
667                "iter_prof",
668                profile_fields_from_json(serde_json::json!({"batch_size": 1})),
669                profile_fields_from_json(serde_json::json!({"total": 10})),
670                false,
671            )
672            .unwrap();
673        assert_eq!(writer.path(), None);
674    }
675
676    #[test]
677    fn enabled_profile_writer_writes_valid_jsonl() {
678        let dir = tempdir();
679        let path = dir.join("profile.jsonl");
680        let writer = ProfileJsonlWriter::enabled(
681            path.clone(),
682            ProfileMetadata {
683                commit_sha: Some("abc123".to_string()),
684                env_hash: "sha256:env".to_string(),
685                model: "model".to_string(),
686                concurrency: 32,
687                runtime_flags: serde_json::json!({"schema_version": 1}),
688            },
689        )
690        .unwrap();
691
692        writer
693            .push_event(
694                "bucket_prof",
695                profile_fields_from_json(serde_json::json!({"layers": 48})),
696                profile_fields_from_json(serde_json::json!({"gemm1": 1200, "gemm3": 800})),
697                true,
698            )
699            .unwrap();
700        writer.flush().unwrap();
701
702        let jsonl = std::fs::read_to_string(&path).unwrap();
703        let events = parse_profile_jsonl_str(&jsonl).unwrap();
704        assert_eq!(events.len(), 1);
705        assert_eq!(events[0].event, "bucket_prof");
706        assert_eq!(events[0].concurrency, 32);
707        let _ = std::fs::remove_dir_all(&dir);
708    }
709
710    #[test]
711    fn typed_profile_sink_config_builds_writer() {
712        let dir = tempdir();
713        let path = dir.join("typed-profile.jsonl");
714        let config = ProfileSinkConfig::enabled(
715            path.clone(),
716            ProfileMetadata {
717                commit_sha: Some("def456".to_string()),
718                env_hash: "sha256:typed".to_string(),
719                model: "typed-model".to_string(),
720                concurrency: 16,
721                runtime_flags: serde_json::json!({"source": "typed"}),
722            },
723        );
724        let writer = ProfileJsonlWriter::from_config(config).unwrap();
725        assert!(writer.is_enabled());
726        assert_eq!(writer.path().as_deref(), Some(path.as_path()));
727
728        writer
729            .push_event(
730                "unified_prof",
731                profile_fields_from_json(serde_json::json!({"items": 16})),
732                profile_fields_from_json(serde_json::json!({"total": 1000})),
733                false,
734            )
735            .unwrap();
736        writer.flush().unwrap();
737
738        let jsonl = std::fs::read_to_string(&path).unwrap();
739        let events = parse_profile_jsonl_str(&jsonl).unwrap();
740        assert_eq!(events[0].commit_sha.as_deref(), Some("def456"));
741        assert_eq!(events[0].env_hash, "sha256:typed");
742        assert_eq!(events[0].model, "typed-model");
743        assert_eq!(events[0].concurrency, 16);
744        assert_eq!(events[0].runtime_flags["source"], "typed");
745        let _ = std::fs::remove_dir_all(&dir);
746    }
747
748    #[test]
749    fn profile_metadata_parses_env_snapshot() {
750        let metadata = ProfileMetadata::from_env_vars([
751            (PROFILE_COMMIT_SHA_ENV, "abc123"),
752            (PROFILE_ENV_HASH_ENV, "sha256:env"),
753            (PROFILE_MODEL_ENV, "model"),
754            (PROFILE_CONCURRENCY_ENV, "32"),
755            (PROFILE_RUNTIME_FLAGS_JSON_ENV, r#"{"fa_layout":true}"#),
756        ]);
757
758        assert_eq!(metadata.commit_sha.as_deref(), Some("abc123"));
759        assert_eq!(metadata.env_hash, "sha256:env");
760        assert_eq!(metadata.model, "model");
761        assert_eq!(metadata.concurrency, 32);
762        assert_eq!(metadata.runtime_flags["fa_layout"], true);
763    }
764
765    #[test]
766    fn profile_sink_config_parses_env_snapshot() {
767        let config = ProfileSinkConfig::from_env_vars([
768            (PROFILE_JSONL_ENV, "/tmp/profile.jsonl"),
769            (PROFILE_ENV_HASH_ENV, "sha256:env"),
770            (PROFILE_MODEL_ENV, "model"),
771        ]);
772
773        assert_eq!(
774            config.jsonl_path.as_deref(),
775            Some(std::path::Path::new("/tmp/profile.jsonl"))
776        );
777        assert_eq!(config.metadata.env_hash, "sha256:env");
778        assert_eq!(config.metadata.model, "model");
779
780        let disabled = ProfileSinkConfig::from_env_vars([(PROFILE_JSONL_ENV, "")]);
781        assert_eq!(disabled.jsonl_path, None);
782    }
783
784    fn tempdir() -> std::path::PathBuf {
785        let d = std::env::temp_dir().join(format!(
786            "ferrum-profile-test-{}",
787            std::time::SystemTime::now()
788                .duration_since(std::time::UNIX_EPOCH)
789                .unwrap()
790                .as_nanos()
791        ));
792        std::fs::create_dir_all(&d).unwrap();
793        d
794    }
795}