Skip to main content

ai_agents_eval/
runner.rs

1use std::collections::{HashMap, HashSet};
2use std::path::{Path, PathBuf};
3use std::sync::{Arc, Mutex, MutexGuard, OnceLock};
4use std::time::Instant;
5
6use ai_agents_observability::ObservabilityConfig;
7use ai_agents_observability::config::ExportFormat;
8use ai_agents_runtime::spec::{AgentSpec, LLMConfigOrSelector, StorageConfig};
9use ai_agents_runtime::{Agent, AgentBuilder, RuntimeAgent, StreamChunk};
10use futures::{StreamExt, stream};
11use serde_json::{Value, json};
12use tokio::time::{Duration, timeout};
13
14use crate::assertion::{AssertionEvalContext, AssertionOutcome, evaluate_assertion};
15use crate::compatibility::suite_from_jsonl;
16use crate::evidence::{collect_turn_evidence, relationship_snapshot};
17use crate::fixtures::{
18    LlmFixtureMode, RecordingToolLog, build_llm_registry, build_tool_registry,
19    resolve_fixture_context, start_mock_server,
20};
21use crate::judge::{JudgeConfig, JudgeResolver};
22use crate::metrics::compute_metrics;
23use crate::redaction::{redact_text, redact_value};
24use crate::suite::{
25    AttemptResult, EvalResult, EvalSuite, FailureCategory, IsolationMode, ResetStepConfig,
26    Scenario, ScenarioResult, ScenarioStatus, ScenarioStep, Turn, TurnResult,
27};
28use crate::{EvalError, Result};
29
30/// Runtime options supplied by CLI or Rust callers.
31#[derive(Debug, Clone, Default)]
32pub struct EvalRunnerOptions {
33    /// Agent YAML path used for this run.
34    pub agent: Option<PathBuf>,
35    /// Scenario test cases in this suite.
36    pub scenarios: Option<PathBuf>,
37    /// Directory where output artifacts are written.
38    pub output: PathBuf,
39    /// Scenario IDs selected for execution.
40    pub ids: Vec<String>,
41    /// Tags used by filters and grouped metrics.
42    pub tags: Vec<String>,
43    /// Whether all selected tags must match.
44    pub tag_mode_all: bool,
45    /// Language labels selected for execution.
46    pub languages: Vec<String>,
47    /// Optional retry count or suite retry count.
48    pub retries: Option<u32>,
49    /// Optional timeout override for this turn.
50    pub timeout_ms: Option<u64>,
51    /// Optional scenario concurrency override.
52    pub parallel: Option<usize>,
53    /// Stop after the first failed or errored scenario.
54    pub fail_fast: bool,
55    /// Observability assertion, setting, or report value.
56    pub observability: bool,
57    /// Optional LLM fixture mode override.
58    pub llm_mode: Option<LlmFixtureMode>,
59    /// Optional cassette JSONL file for replay or record mode.
60    pub cassette: Option<PathBuf>,
61}
62
63/// Parsed suite runner with immutable options and suite state.
64pub struct EvalRunner {
65    /// Path to the loaded suite file.
66    suite_path: PathBuf,
67    /// Parsed and validated suite.
68    suite: EvalSuite,
69    /// Runtime options applied to the suite.
70    options: EvalRunnerOptions,
71}
72
73impl EvalRunner {
74    pub fn from_file(path: impl AsRef<Path>, options: EvalRunnerOptions) -> Result<Self> {
75        let path = path.as_ref().to_path_buf();
76        let content = std::fs::read_to_string(&path)?;
77        let mut suite = if path.extension().and_then(|e| e.to_str()) == Some("jsonl") {
78            suite_from_jsonl(
79                path.file_stem()
80                    .and_then(|s| s.to_str())
81                    .unwrap_or("eval")
82                    .to_string(),
83                &content,
84            )?
85        } else {
86            serde_yaml::from_str::<EvalSuite>(&content)?
87        };
88        if let Some(agent) = &options.agent {
89            suite.agent = Some(agent.clone());
90        }
91        if let Some(retries) = options.retries {
92            suite.settings.retries = retries;
93        }
94        if let Some(timeout_ms) = options.timeout_ms {
95            suite.settings.timeout_per_turn_ms = timeout_ms;
96        }
97        if let Some(parallel) = options.parallel {
98            suite.settings.parallel = parallel > 1;
99            suite.settings.max_concurrent = parallel.max(1);
100        }
101        if options.fail_fast {
102            suite.settings.fail_fast = true;
103        }
104        if let Some(mode) = options.llm_mode {
105            suite.fixtures.llm.mode = mode;
106        }
107        if let Some(cassette) = &options.cassette {
108            suite.fixtures.llm.cassette = Some(cassette.clone());
109        }
110        suite.validate(options.agent.as_ref())?;
111        Ok(Self {
112            suite_path: path,
113            suite,
114            options,
115        })
116    }
117
118    pub async fn run(&self) -> Result<EvalResult> {
119        let start = Instant::now();
120        let base_dir = self.suite_path.parent().unwrap_or_else(|| Path::new("."));
121        let agent_path = self.resolve_agent_path(base_dir)?;
122        let scenarios = self.filtered_scenarios();
123        let results = if self.suite.settings.parallel && !self.suite.settings.fail_fast {
124            self.run_scenarios_parallel(&agent_path, base_dir, scenarios)
125                .await
126        } else {
127            self.run_scenarios_serial(&agent_path, base_dir, scenarios)
128                .await
129        };
130
131        let total = results.len();
132        let passed = results.iter().filter(|r| r.status.is_passed()).count();
133        let failed = results
134            .iter()
135            .filter(|r| r.status.is_failed() || r.status.is_error())
136            .count();
137        let skipped = results
138            .iter()
139            .filter(|r| matches!(r.status, ScenarioStatus::Skipped { .. }))
140            .count();
141        let metrics = compute_metrics(&results);
142
143        let observability = final_observability_report(&results);
144
145        Ok(EvalResult {
146            schema_version: 1,
147            suite: self.suite.name.clone(),
148            agent: agent_path.display().to_string(),
149            total,
150            passed,
151            failed,
152            skipped,
153            duration_ms: start.elapsed().as_millis() as u64,
154            scenarios: results,
155            metrics,
156            observability,
157        })
158    }
159
160    async fn run_scenarios_serial(
161        &self,
162        agent_path: &Path,
163        base_dir: &Path,
164        scenarios: Vec<&Scenario>,
165    ) -> Vec<ScenarioResult> {
166        let mut results = Vec::new();
167        for scenario in scenarios {
168            let result = self.run_scenario(agent_path, base_dir, scenario).await;
169            match result {
170                Ok(result) => {
171                    let stop = self.suite.settings.fail_fast
172                        && (result.status.is_failed() || result.status.is_error());
173                    results.push(result);
174                    if stop {
175                        break;
176                    }
177                }
178                Err(error) => {
179                    results.push(error_result(scenario, error, FailureCategory::RuntimeError));
180                    if self.suite.settings.fail_fast {
181                        break;
182                    }
183                }
184            }
185        }
186        results
187    }
188
189    async fn run_scenarios_parallel(
190        &self,
191        agent_path: &Path,
192        base_dir: &Path,
193        scenarios: Vec<&Scenario>,
194    ) -> Vec<ScenarioResult> {
195        let max_concurrent = self.suite.settings.max_concurrent.max(1);
196        let mut indexed = stream::iter(scenarios.into_iter().enumerate())
197            .map(|(idx, scenario)| async move {
198                let result = self.run_scenario(agent_path, base_dir, scenario).await;
199                let result = result.unwrap_or_else(|error| {
200                    error_result(scenario, error, FailureCategory::RuntimeError)
201                });
202                (idx, result)
203            })
204            .buffer_unordered(max_concurrent)
205            .collect::<Vec<_>>()
206            .await;
207        indexed.sort_by_key(|(idx, _)| *idx);
208        indexed.into_iter().map(|(_, result)| result).collect()
209    }
210
211    fn resolve_agent_path(&self, base_dir: &Path) -> Result<PathBuf> {
212        if let Some(agent) = &self.options.agent {
213            return Ok(agent.clone());
214        }
215        let agent =
216            self.suite.agent.clone().ok_or_else(|| {
217                EvalError::Config("agent path is required in suite or CLI".into())
218            })?;
219        Ok(if agent.is_absolute() {
220            agent
221        } else {
222            base_dir.join(agent)
223        })
224    }
225
226    fn filtered_scenarios(&self) -> Vec<&Scenario> {
227        let ids: HashSet<_> = self.options.ids.iter().collect();
228        let tags: HashSet<_> = self.options.tags.iter().collect();
229        let languages: HashSet<_> = self.options.languages.iter().collect();
230        self.suite
231            .scenarios
232            .iter()
233            .filter(|scenario| {
234                if !ids.is_empty() && !ids.contains(&scenario.id) {
235                    return false;
236                }
237                if !languages.is_empty() {
238                    let Some(language) = &scenario.language else {
239                        return false;
240                    };
241                    if !languages.contains(language) {
242                        return false;
243                    }
244                }
245                if !tags.is_empty() {
246                    let scenario_tags: HashSet<_> = scenario.tags.iter().collect();
247                    if self.options.tag_mode_all {
248                        if !tags.iter().all(|tag| scenario_tags.contains(*tag)) {
249                            return false;
250                        }
251                    } else if !tags.iter().any(|tag| scenario_tags.contains(*tag)) {
252                        return false;
253                    }
254                }
255                true
256            })
257            .collect()
258    }
259
260    async fn run_scenario(
261        &self,
262        agent_path: &Path,
263        base_dir: &Path,
264        scenario: &Scenario,
265    ) -> Result<ScenarioResult> {
266        let start = Instant::now();
267        if scenario.skip.is_skipped() {
268            return Ok(ScenarioResult {
269                id: scenario.id.clone(),
270                name: scenario.name.clone(),
271                tags: scenario.tags.clone(),
272                language: scenario.language.clone(),
273                status: ScenarioStatus::Skipped {
274                    reason: scenario.skip.reason(),
275                },
276                failure_category: None,
277                flaky: false,
278                attempts: Vec::new(),
279                duration_ms: 0,
280                retries_used: 0,
281            });
282        }
283
284        let mut attempts = Vec::new();
285        let mut final_status = ScenarioStatus::Failed {
286            reason: "not run".to_string(),
287        };
288        let mut category = Some(FailureCategory::AssertionFailed);
289        let max_attempt = self.suite.settings.retries + 1;
290
291        for attempt_idx in 0..max_attempt {
292            let attempt_future = self.run_attempt(agent_path, base_dir, scenario, attempt_idx);
293            let attempt = if let Some(timeout_ms) = self.suite.settings.timeout_per_scenario_ms {
294                match timeout(Duration::from_millis(timeout_ms), attempt_future).await {
295                    Ok(result) => result,
296                    Err(_) => Err(EvalError::Runtime(format!(
297                        "scenario '{}' attempt {} timed out after {}ms",
298                        scenario.id, attempt_idx, timeout_ms
299                    ))),
300                }
301            } else {
302                attempt_future.await
303            };
304            match attempt {
305                Ok(attempt_result) => {
306                    final_status = attempt_result.status.clone();
307                    if final_status.is_passed() {
308                        attempts.push(attempt_result);
309                        category = if attempt_idx > 0 {
310                            Some(FailureCategory::FlakyPass)
311                        } else {
312                            None
313                        };
314                        break;
315                    }
316                    category = Some(if final_status.is_error() {
317                        FailureCategory::RuntimeError
318                    } else {
319                        failure_category_for_attempt(&attempt_result)
320                    });
321                    attempts.push(attempt_result);
322                }
323                Err(error) => {
324                    final_status = ScenarioStatus::Error {
325                        message: error.to_string(),
326                    };
327                    category = Some(FailureCategory::RuntimeError);
328                    attempts.push(AttemptResult {
329                        attempt: attempt_idx,
330                        turns: Vec::new(),
331                        status: final_status.clone(),
332                        duration_ms: 0,
333                    });
334                }
335            }
336            if attempt_idx + 1 < max_attempt {
337                tokio::time::sleep(Duration::from_millis(self.suite.settings.retry_delay_ms)).await;
338            }
339        }
340
341        let flaky = final_status.is_passed() && attempts.len() > 1;
342        Ok(ScenarioResult {
343            id: scenario.id.clone(),
344            name: scenario.name.clone(),
345            tags: scenario.tags.clone(),
346            language: scenario.language.clone(),
347            status: final_status,
348            failure_category: category,
349            flaky,
350            duration_ms: start.elapsed().as_millis() as u64,
351            retries_used: attempts.len().saturating_sub(1) as u32,
352            attempts,
353        })
354    }
355
356    async fn run_attempt(
357        &self,
358        agent_path: &Path,
359        base_dir: &Path,
360        scenario: &Scenario,
361        attempt: u32,
362    ) -> Result<AttemptResult> {
363        let start = Instant::now();
364        let workspace = std::env::temp_dir().join(format!(
365            "ai_agents_eval_{}_{}_{}",
366            scenario.id,
367            attempt,
368            uuid::Uuid::new_v4()
369        ));
370        std::fs::create_dir_all(&workspace)?;
371        let _env_guard = EnvGuard::apply(&scenario.env)?;
372        let mock_server = start_mock_server(self.suite.fixtures.mock_server.as_ref()).await?;
373        let tool_log = RecordingToolLog::new();
374        let mut agent = self
375            .build_agent(agent_path, base_dir, &workspace, tool_log.clone())
376            .await?;
377        apply_base_context(
378            &agent,
379            &self.suite,
380            base_dir,
381            scenario,
382            mock_server.as_ref(),
383        )?;
384        let mut turns = Vec::new();
385        let mut status = ScenarioStatus::Passed;
386
387        if !scenario.turns.is_empty() {
388            for (idx, turn) in scenario.turns.iter().enumerate() {
389                let turn_result = self
390                    .run_turn(&agent, scenario, turn, idx, &tool_log)
391                    .await?;
392                let turn_failed = turn_result.assertion_results.iter().any(|r| !r.passed);
393                turns.push(turn_result);
394                if turn_failed {
395                    status = ScenarioStatus::Failed {
396                        reason: format!("turn {} assertion failed", idx + 1),
397                    };
398                    break;
399                }
400                if self.suite.settings.isolation == IsolationMode::Turn
401                    && idx + 1 < scenario.turns.len()
402                {
403                    agent.reset().await?;
404                    apply_base_context(
405                        &agent,
406                        &self.suite,
407                        base_dir,
408                        scenario,
409                        mock_server.as_ref(),
410                    )?;
411                }
412            }
413        }
414
415        for step in &scenario.steps {
416            if !status.is_passed() {
417                break;
418            }
419            match step {
420                ScenarioStep::Run(run) => {
421                    for turn in &run.turns {
422                        let idx = turns.len();
423                        let turn_result = self
424                            .run_turn(&agent, scenario, turn, idx, &tool_log)
425                            .await?;
426                        let turn_failed = turn_result.assertion_results.iter().any(|r| !r.passed);
427                        turns.push(turn_result);
428                        if turn_failed {
429                            status = ScenarioStatus::Failed {
430                                reason: format!("turn {} assertion failed", idx + 1),
431                            };
432                            break;
433                        }
434                    }
435                    if let Some(session) = &run.save_session {
436                        agent.save_session(session).await?;
437                    }
438                }
439                ScenarioStep::ResetAgent(reset) => {
440                    if let Some(options) = reset_options(reset) {
441                        if options.delete_persistence || !options.preserve_storage {
442                            let _ = std::fs::remove_dir_all(&workspace);
443                            std::fs::create_dir_all(&workspace)?;
444                        }
445                        let preserved_actor = options
446                            .preserve_actor_id
447                            .then(|| agent.actor_id())
448                            .flatten();
449                        if matches!(options.profile, crate::reset::ResetProfile::Conversation)
450                            && !options.delete_persistence
451                        {
452                            agent.reset().await?;
453                        } else {
454                            agent = self
455                                .build_agent(agent_path, base_dir, &workspace, tool_log.clone())
456                                .await?;
457                        }
458                        if options.preserve_host_context {
459                            apply_base_context(
460                                &agent,
461                                &self.suite,
462                                base_dir,
463                                scenario,
464                                mock_server.as_ref(),
465                            )?;
466                        }
467                        if let Some(actor) = preserved_actor.or_else(|| scenario.actor.clone()) {
468                            agent.set_actor_id(&actor)?;
469                            agent.load_actor_memory().await?;
470                            agent.load_actor_relationship().await?;
471                        }
472                    }
473                }
474                ScenarioStep::SaveSession(name) => {
475                    agent.save_session(name).await?;
476                }
477                ScenarioStep::LoadSession(name) => {
478                    let _ = agent.load_session(name).await?;
479                }
480                ScenarioStep::SetContext { values } => {
481                    apply_context_value(&agent, values)?;
482                }
483                ScenarioStep::SetActor { actor } => {
484                    agent.set_actor_id(actor)?;
485                    agent.load_actor_memory().await?;
486                    agent.load_actor_relationship().await?;
487                }
488                ScenarioStep::CleanupExpired => {
489                    let _ = agent.cleanup_expired_sessions().await?;
490                }
491            }
492        }
493
494        Ok(AttemptResult {
495            attempt,
496            turns,
497            status,
498            duration_ms: start.elapsed().as_millis() as u64,
499        })
500    }
501
502    async fn build_agent(
503        &self,
504        agent_path: &Path,
505        base_dir: &Path,
506        workspace: &Path,
507        tool_log: RecordingToolLog,
508    ) -> Result<RuntimeAgent> {
509        let content = std::fs::read_to_string(agent_path)?;
510        let mut spec: AgentSpec = serde_yaml::from_str(&content)?;
511        apply_eval_llm_settings(&mut spec, &self.suite.settings);
512        spec.validate()
513            .map_err(|error| EvalError::Config(error.to_string()))?;
514        let (llm_registry, _judge_llm) =
515            build_llm_registry(&spec, &self.suite.fixtures.llm, base_dir)?;
516        let tool_registry = build_tool_registry(&self.suite.fixtures, tool_log)?;
517        let mut builder = AgentBuilder::from_yaml_file(agent_path)
518            .map_err(|error| EvalError::Config(error.to_string()))?
519            .llm_registry(llm_registry)
520            .tools(tool_registry)
521            .auto_configure_features()
522            .map_err(|error| EvalError::Config(error.to_string()))?
523            .auto_configure_mcp()
524            .await
525            .map_err(|error| EvalError::Config(error.to_string()))?;
526
527        let storage_override = isolated_storage_config(&spec, workspace);
528        if let Some(storage) = storage_override {
529            builder = builder.storage_config(storage);
530        }
531
532        if let Some(observability) = self.observability_config(base_dir)? {
533            let manager = ai_agents_observability::ObservabilityManager::new(observability);
534            builder = builder.observability(manager);
535        }
536        builder = builder
537            .auto_configure_spawner()
538            .await
539            .map_err(|error| EvalError::Config(error.to_string()))?;
540        let agent = builder
541            .build()
542            .map_err(|error| EvalError::Config(error.to_string()))?;
543        agent.init_storage().await?;
544        Ok(agent)
545    }
546
547    fn observability_config(&self, base_dir: &Path) -> Result<Option<ObservabilityConfig>> {
548        let mut config = if let Some(config) = self.suite.observability.clone() {
549            config
550        } else if self.options.observability {
551            let mut config = ObservabilityConfig::default();
552            config.enabled = true;
553            config.export.formats = vec![ExportFormat::Json];
554            config.export.path = self
555                .options
556                .output
557                .join("observability")
558                .display()
559                .to_string();
560            config.export.write_report = true;
561            config
562        } else {
563            return Ok(None);
564        };
565        if !config.enabled {
566            return Ok(None);
567        }
568        config = config
569            .with_pricing_file_loaded(Some(base_dir))
570            .map_err(|error| EvalError::Config(error.to_string()))?;
571        config
572            .validate()
573            .map_err(|error| EvalError::Config(error.to_string()))?;
574        Ok(Some(config))
575    }
576
577    async fn run_turn(
578        &self,
579        agent: &RuntimeAgent,
580        scenario: &Scenario,
581        turn: &Turn,
582        index: usize,
583        tool_log: &RecordingToolLog,
584    ) -> Result<TurnResult> {
585        apply_context_value(agent, &turn.context)?;
586        if let Some(actor) = &turn.actor {
587            agent.set_actor_id(actor)?;
588        }
589        let before_relationship = relationship_snapshot(agent);
590        let tool_start = tool_log.len();
591        let start = Instant::now();
592        let timeout_ms = turn
593            .timeout_ms
594            .unwrap_or(self.suite.settings.timeout_per_turn_ms);
595        let (response_content, response_metadata) = if turn.stream.unwrap_or(false) {
596            timeout(
597                Duration::from_millis(timeout_ms),
598                collect_stream_response(agent, &turn.input),
599            )
600            .await
601            .map_err(|_| EvalError::Runtime(format!("turn timed out after {}ms", timeout_ms)))??
602        } else {
603            let response = timeout(Duration::from_millis(timeout_ms), agent.chat(&turn.input))
604                .await
605                .map_err(|_| {
606                    EvalError::Runtime(format!("turn timed out after {}ms", timeout_ms))
607                })??;
608            (response.content, response.metadata)
609        };
610        agent.flush_background_tasks().await?;
611        let latency_ms = start.elapsed().as_millis() as u64;
612        let evidence = collect_turn_evidence(
613            agent,
614            response_metadata.clone(),
615            tool_log,
616            tool_start,
617            before_relationship,
618        );
619        let judge = self.build_judge(agent);
620        let mut assertion_results = if let Some(assertion) = &turn.assertions {
621            match evaluate_assertion(
622                assertion,
623                AssertionEvalContext {
624                    evidence: &evidence,
625                    response: &response_content,
626                    user_input: Some(&turn.input),
627                    scenario_id: Some(&scenario.id),
628                    language: scenario.language.as_deref(),
629                    judge_resolver: Some(&judge),
630                },
631            )
632            .await
633            {
634                AssertionOutcome::Passed(details) | AssertionOutcome::Failed(details) => details,
635                AssertionOutcome::Error(message) => return Err(EvalError::Assertion(message)),
636            }
637        } else {
638            Vec::new()
639        };
640        if self.suite.settings.redact_outputs {
641            redact_assertion_details(&mut assertion_results);
642        }
643        let observability_span_id = evidence
644            .observability
645            .as_ref()
646            .and_then(|obs| obs.span_ids.last().cloned());
647        Ok(TurnResult {
648            index,
649            input: redact_text(&turn.input, self.suite.settings.redact_outputs, 0),
650            response: redact_text(&response_content, self.suite.settings.redact_outputs, 0),
651            state: evidence.state.clone(),
652            metadata: if self.suite.settings.redact_outputs {
653                None
654            } else {
655                response_metadata.and_then(|m| serde_json::to_value(m).ok())
656            },
657            evidence,
658            assertion_results,
659            latency_ms,
660            observability_span_id,
661        })
662    }
663
664    fn build_judge(&self, agent: &RuntimeAgent) -> JudgeResolver {
665        JudgeResolver::new(Arc::clone(agent.llm_registry()), JudgeConfig::default())
666    }
667}
668
669async fn collect_stream_response(
670    agent: &RuntimeAgent,
671    input: &str,
672) -> Result<(String, Option<HashMap<String, Value>>)> {
673    let mut stream = agent.chat_stream(input).await?;
674    let mut content = String::new();
675    while let Some(chunk) = stream.next().await {
676        match chunk {
677            StreamChunk::Content { text } => content.push_str(&text),
678            StreamChunk::Done {} => break,
679            StreamChunk::Error { message } => return Err(EvalError::Runtime(message)),
680            _ => {}
681        }
682    }
683    Ok((content, None))
684}
685
686fn isolated_storage_config(spec: &AgentSpec, workspace: &Path) -> Option<StorageConfig> {
687    if spec.storage.is_none() {
688        return None;
689    }
690    match &spec.storage {
691        StorageConfig::Sqlite(_) => Some(StorageConfig::sqlite(
692            workspace.join("sessions.db").display().to_string(),
693        )),
694        StorageConfig::File(_) => Some(StorageConfig::file(
695            workspace.join("sessions").display().to_string(),
696        )),
697        StorageConfig::Redis(_) => None,
698        StorageConfig::None => None,
699    }
700}
701
702fn apply_context_map(agent: &RuntimeAgent, values: HashMap<String, Value>) -> Result<()> {
703    for (key, value) in values {
704        agent.set_context(&key, value)?;
705    }
706    Ok(())
707}
708
709fn apply_context_value(agent: &RuntimeAgent, value: &Value) -> Result<()> {
710    let Value::Object(map) = value else {
711        return Ok(());
712    };
713    for (key, value) in map {
714        agent.set_context(key, value.clone())?;
715    }
716    Ok(())
717}
718
719fn apply_base_context(
720    agent: &RuntimeAgent,
721    suite: &EvalSuite,
722    base_dir: &Path,
723    scenario: &Scenario,
724    mock_server: Option<&crate::fixtures::MockServerHandle>,
725) -> Result<()> {
726    if let Some(mock_server) = mock_server {
727        apply_context_map(agent, mock_server.context())?;
728    }
729    apply_context_map(agent, resolve_fixture_context(&suite.fixtures, base_dir)?)?;
730    apply_context_value(agent, &scenario.context)?;
731    if let Some(actor) = &scenario.actor {
732        agent.set_actor_id(actor)?;
733    }
734    Ok(())
735}
736
737fn reset_options(config: &ResetStepConfig) -> Option<crate::reset::ResetOptions> {
738    match config {
739        ResetStepConfig::Bool(false) => None,
740        ResetStepConfig::Bool(true) => Some(crate::reset::ResetOptions::default()),
741        ResetStepConfig::Options(options) => Some(options.clone()),
742    }
743}
744
745fn redact_assertion_details(details: &mut [crate::assertion::AssertionResultDetail]) {
746    for detail in details {
747        detail.actual = redact_value(std::mem::take(&mut detail.actual), true, 0);
748        detail.expected = redact_value(std::mem::take(&mut detail.expected), true, 0);
749    }
750}
751
752fn error_result(
753    scenario: &Scenario,
754    error: EvalError,
755    category: FailureCategory,
756) -> ScenarioResult {
757    ScenarioResult {
758        id: scenario.id.clone(),
759        name: scenario.name.clone(),
760        tags: scenario.tags.clone(),
761        language: scenario.language.clone(),
762        status: ScenarioStatus::Error {
763            message: error.to_string(),
764        },
765        failure_category: Some(category),
766        flaky: false,
767        attempts: Vec::new(),
768        duration_ms: 0,
769        retries_used: 0,
770    }
771}
772
773fn failure_category_for_attempt(attempt: &AttemptResult) -> FailureCategory {
774    let judge_failed = attempt.turns.iter().any(|turn| {
775        turn.assertion_results
776            .iter()
777            .any(|detail| !detail.passed && detail.assertion == "judge")
778    });
779    if judge_failed {
780        FailureCategory::JudgeError
781    } else {
782        FailureCategory::AssertionFailed
783    }
784}
785
786fn final_observability_report(
787    results: &[ScenarioResult],
788) -> Option<ai_agents_observability::ObservabilityReport> {
789    results
790        .iter()
791        .rev()
792        .flat_map(|scenario| scenario.attempts.iter().rev())
793        .flat_map(|attempt| attempt.turns.iter().rev())
794        .find_map(|turn| {
795            turn.evidence
796                .observability
797                .as_ref()
798                .and_then(|obs| obs.report.clone())
799        })
800}
801
802fn apply_eval_llm_settings(spec: &mut AgentSpec, settings: &crate::suite::EvalSettings) {
803    if let LLMConfigOrSelector::Config(config) = &mut spec.llm {
804        apply_llm_config_settings(config, settings);
805    }
806    for config in spec.llms.values_mut() {
807        apply_llm_config_settings(config, settings);
808    }
809}
810
811fn apply_llm_config_settings(
812    config: &mut ai_agents_runtime::spec::LLMConfig,
813    settings: &crate::suite::EvalSettings,
814) {
815    if let Some(temperature) = settings.temperature {
816        config.temperature = temperature;
817    }
818    if let Some(seed) = settings.seed {
819        config.extra.insert("seed".to_string(), json!(seed));
820    }
821}
822
823/// Restores process environment variables when an attempt ends.
824struct EnvGuard {
825    /// Previous env values restored on drop.
826    previous: Vec<(String, Option<String>)>,
827    /// Process-wide env lock held for the attempt.
828    _guard: MutexGuard<'static, ()>,
829}
830
831impl EnvGuard {
832    fn apply(values: &HashMap<String, String>) -> Result<Self> {
833        static ENV_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
834        let guard = ENV_LOCK
835            .get_or_init(|| Mutex::new(()))
836            .lock()
837            .map_err(|_| EvalError::Runtime("failed to lock eval environment guard".to_string()))?;
838        let mut previous = Vec::new();
839        for (key, value) in values {
840            previous.push((key.clone(), std::env::var(key).ok()));
841            unsafe {
842                std::env::set_var(key, value);
843            }
844        }
845        Ok(Self {
846            previous,
847            _guard: guard,
848        })
849    }
850}
851
852impl Drop for EnvGuard {
853    fn drop(&mut self) {
854        for (key, value) in self.previous.drain(..).rev() {
855            unsafe {
856                if let Some(value) = value {
857                    std::env::set_var(key, value);
858                } else {
859                    std::env::remove_var(key);
860                }
861            }
862        }
863    }
864}
865
866#[cfg(test)]
867mod tests {
868    use super::*;
869
870    #[tokio::test]
871    async fn runner_executes_mocked_suite_and_redacts_outputs() {
872        let dir = std::env::temp_dir().join(format!(
873            "ai_agents_eval_runner_test_{}",
874            uuid::Uuid::new_v4()
875        ));
876        std::fs::create_dir_all(&dir).unwrap();
877        let agent_path = dir.join("agent.yaml");
878        std::fs::write(
879            &agent_path,
880            r#"
881name: TestAgent
882system_prompt: "You are helpful."
883llm:
884  provider: openai
885  model: gpt-4.1-nano
886"#,
887        )
888        .unwrap();
889        let suite_path = dir.join("suite.yaml");
890        std::fs::write(
891            &suite_path,
892            r#"
893name: Runner Suite
894agent: agent.yaml
895fixtures:
896  llm:
897    mode: mock
898    responses:
899      - "Hello from mock"
900scenarios:
901  - id: smoke
902    turns:
903      - input: Hello
904        assert:
905          response_contains: "Hello"
906"#,
907        )
908        .unwrap();
909        let options = EvalRunnerOptions {
910            output: dir.join("out"),
911            ..Default::default()
912        };
913        let runner = EvalRunner::from_file(&suite_path, options).unwrap();
914        let result = runner.run().await.unwrap();
915        assert_eq!(result.passed, 1);
916        let turn = &result.scenarios[0].attempts[0].turns[0];
917        assert_eq!(turn.input.value, "[redacted]");
918        assert_eq!(turn.response.value, "[redacted]");
919        let json = serde_json::to_string(&result).unwrap();
920        assert!(!json.contains("Hello from mock"));
921        let _ = std::fs::remove_dir_all(dir);
922    }
923}