1use crate::error::EvaluationError;
2use crate::evaluate::store::{AssertionResultStore, LLMResponseStore, TaskRegistry, TaskType};
3use crate::tasks::traits::EvaluationTask;
4use chrono::{DateTime, Utc};
5use scouter_types::genai::traits::ProfileExt;
6use scouter_types::genai::{AssertionResult, ExecutionPlan, GenAIEvalProfile, GenAIEvalSet};
7use scouter_types::GenAIEvalRecord;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use tokio::task::JoinSet;
13use tracing::{debug, error, instrument};
14
15#[derive(Debug, Clone)]
16struct ExecutionContext {
17 base_context: Arc<Value>,
18 assertion_store: Arc<RwLock<AssertionResultStore>>,
19 llm_response_store: Arc<RwLock<LLMResponseStore>>,
20 task_registry: Arc<RwLock<TaskRegistry>>,
21 task_stages: HashMap<String, i32>,
22}
23
24impl ExecutionContext {
25 fn new(base_context: Value, registry: TaskRegistry, execution_plan: &ExecutionPlan) -> Self {
26 debug!("Creating ExecutionContext");
27 Self {
28 base_context: Arc::new(base_context),
29 assertion_store: Arc::new(RwLock::new(AssertionResultStore::new())),
30 llm_response_store: Arc::new(RwLock::new(LLMResponseStore::new())),
31 task_registry: Arc::new(RwLock::new(registry)),
32 task_stages: Self::build_task_stages(execution_plan),
33 }
34 }
35
36 fn build_task_stages(execution_plan: &ExecutionPlan) -> HashMap<String, i32> {
37 execution_plan
38 .nodes
39 .iter()
40 .map(|(id, node)| (id.clone(), node.stage as i32))
41 .collect()
42 }
43
44 async fn build_scoped_context(&self, depends_on: &[String]) -> Value {
45 if depends_on.is_empty() {
46 return self.base_context.as_ref().clone();
47 }
48
49 let mut scoped_context = self.build_context_map(&self.base_context);
50 let registry = self.task_registry.read().await;
51
52 for dep_id in depends_on {
53 match registry.get_type(dep_id) {
54 Some(TaskType::Assertion) => {
55 let store = self.assertion_store.read().await;
56 if let Some(result) = store.retrieve(dep_id) {
57 scoped_context.insert(dep_id.clone(), result.2.actual.clone());
58 }
59 }
60 Some(TaskType::LLMJudge) => {
61 let store = self.llm_response_store.read().await;
62 if let Some(response) = store.retrieve(dep_id) {
63 scoped_context.insert(dep_id.clone(), response.clone());
64 }
65 }
66 None => {}
67 }
68 }
69
70 Value::Object(scoped_context)
71 }
72
73 fn build_context_map(&self, value: &Value) -> serde_json::Map<String, Value> {
74 match value {
75 Value::Object(obj) => obj.clone(),
76 _ => {
77 let mut map = serde_json::Map::new();
78 map.insert("context".to_string(), value.clone());
79 map
80 }
81 }
82 }
83
84 async fn store_assertion(
85 &self,
86 task_id: String,
87 start_time: DateTime<Utc>,
88 end_time: DateTime<Utc>,
89 result: AssertionResult,
90 ) {
91 self.assertion_store
92 .write()
93 .await
94 .store(task_id, start_time, end_time, result);
95 }
96
97 async fn store_llm_response(&self, task_id: String, response: Value) {
98 self.llm_response_store
99 .write()
100 .await
101 .store(task_id, response);
102 }
103}
104
105struct DependencyChecker {
106 context: ExecutionContext,
107}
108
109impl DependencyChecker {
110 fn new(context: ExecutionContext) -> Self {
111 Self { context }
112 }
113
114 async fn check_dependencies_satisfied(&self, task_id: &str) -> Option<bool> {
115 debug!("Checking dependencies for task: {}", task_id);
116 let dependencies = {
117 let registry = self.context.task_registry.read().await;
118 match registry.get_dependencies(task_id) {
119 Some(deps) => deps,
120 None => {
121 debug!("Task '{}' has no dependencies, ready to execute", task_id);
123 return Some(true);
124 }
125 }
126 };
127
128 debug!("Task '{}' has dependencies: {:?}", task_id, dependencies);
129
130 let dep_metadata = {
131 let registry = self.context.task_registry.read().await;
132 dependencies
133 .iter()
134 .map(|dep_id| {
135 (
136 dep_id.clone(),
137 registry.is_conditional(dep_id),
138 registry.is_skipped(dep_id),
139 )
140 })
141 .collect::<Vec<_>>()
142 };
143
144 for (dep_id, is_conditional, is_skipped) in dep_metadata {
145 debug!(
146 "Checking dependency '{}' for task '{}': conditional={}, skipped={}",
147 dep_id, task_id, is_conditional, is_skipped
148 );
149 if is_skipped {
150 self.mark_skipped(task_id).await;
151 return Some(false);
152 }
153
154 let completed = self.check_task_completed(&dep_id).await;
155 if !completed {
156 if is_conditional {
157 self.mark_skipped(task_id).await;
158 return Some(false);
159 }
160 return None;
161 }
162
163 if is_conditional && !self.check_assertion_passed(&dep_id).await? {
164 self.mark_skipped(task_id).await;
165 return Some(false);
166 }
167 }
168
169 Some(true)
170 }
171
172 async fn check_task_completed(&self, task_id: &str) -> bool {
173 let registry = self.context.task_registry.read().await;
174 match registry.get_type(task_id) {
175 Some(TaskType::Assertion) => self
176 .context
177 .assertion_store
178 .read()
179 .await
180 .retrieve(task_id)
181 .is_some(),
182 Some(TaskType::LLMJudge) => self
183 .context
184 .llm_response_store
185 .read()
186 .await
187 .retrieve(task_id)
188 .is_some(),
189 None => false,
190 }
191 }
192
193 async fn check_assertion_passed(&self, task_id: &str) -> Option<bool> {
194 self.context
195 .assertion_store
196 .read()
197 .await
198 .retrieve(task_id)
199 .map(|res| res.2.passed)
200 }
201
202 async fn mark_skipped(&self, task_id: &str) {
203 self.context
204 .task_registry
205 .write()
206 .await
207 .mark_skipped(task_id.to_string());
208 }
209
210 async fn filter_executable_tasks<'a>(&self, task_ids: &'a [String]) -> Vec<&'a str> {
211 debug!("Filtering executable tasks from: {:?}", task_ids);
212 let mut executable = Vec::with_capacity(task_ids.len());
213
214 for task_id in task_ids {
215 if let Some(true) = self.check_dependencies_satisfied(task_id).await {
216 executable.push(task_id.as_str());
217 }
218 }
219
220 executable
221 }
222}
223
224struct TaskExecutor {
225 context: ExecutionContext,
226 profile: Arc<GenAIEvalProfile>,
227}
228
229impl TaskExecutor {
230 fn new(context: ExecutionContext, profile: Arc<GenAIEvalProfile>) -> Self {
231 debug!("Creating TaskExecutor");
232 Self { context, profile }
233 }
234
235 #[instrument(skip_all)]
236 async fn execute_level(&self, task_ids: &[String]) -> Result<(), EvaluationError> {
237 let checker = DependencyChecker::new(self.context.clone());
238 let executable_tasks = checker.filter_executable_tasks(task_ids).await;
239
240 debug!("Executable tasks for level: {:?}", executable_tasks);
241
242 if executable_tasks.is_empty() {
243 return Ok(());
244 }
245
246 let (assertions, judges) = self.partition_tasks(executable_tasks).await;
247
248 debug!(
249 "Executing level with {} assertions and {} LLM judges",
250 assertions.len(),
251 judges.len()
252 );
253
254 let _result = tokio::try_join!(
255 self.execute_assertions(&assertions),
256 self.execute_llm_judges(&judges)
257 )?;
258
259 Ok(())
260 }
261
262 async fn partition_tasks<'a>(&self, task_ids: Vec<&'a str>) -> (Vec<&'a str>, Vec<&'a str>) {
263 let registry = self.context.task_registry.read().await;
264 let mut assertions = Vec::new();
265 let mut judges = Vec::new();
266
267 for id in task_ids {
268 match registry.get_type(id) {
269 Some(TaskType::Assertion) => assertions.push(id),
270 Some(TaskType::LLMJudge) => judges.push(id),
271 None => continue,
272 }
273 }
274
275 (assertions, judges)
276 }
277
278 async fn execute_assertions(&self, task_ids: &[&str]) -> Result<(), EvaluationError> {
279 debug!("Executing assertion tasks: {:?}", task_ids);
280 if task_ids.is_empty() {
281 return Ok(());
282 }
283
284 let mut join_set = JoinSet::new();
285
286 for &task_id in task_ids {
287 let task_id = task_id.to_string();
288 let context = self.context.clone();
289 let profile = self.profile.clone();
290
291 join_set.spawn(async move {
292 Self::execute_assertion_task(&task_id, &context, &profile).await
293 });
294 }
295
296 while let Some(result) = join_set.join_next().await {
297 result.map_err(|e| {
298 EvaluationError::GenAIEvaluatorError(format!("Task join error: {}", e))
299 })??;
300 }
301
302 Ok(())
303 }
304
305 async fn execute_llm_judges(&self, task_ids: &[&str]) -> Result<(), EvaluationError> {
306 debug!("Executing LLM judge tasks: {:?}", task_ids);
307 if task_ids.is_empty() {
308 return Ok(());
309 }
310
311 let mut join_set = JoinSet::new();
312
313 for &task_id in task_ids {
314 let task_id = task_id.to_string();
315 let context = self.context.clone();
316 let profile = self.profile.clone();
317
318 join_set.spawn(async move {
319 let result = Self::execute_llm_judge_task(&task_id, &context, &profile).await;
320 result
321 });
322 }
323
324 let mut results = HashMap::with_capacity(task_ids.len());
325 while let Some(result) = join_set.join_next().await {
326 let (judge_id, start_time, response) = result.map_err(|e| {
327 EvaluationError::GenAIEvaluatorError(format!("Task join error: {}", e))
328 })??;
329 results.insert(judge_id, (start_time, response));
330 }
331
332 self.process_llm_judge_results(results).await?;
333 Ok(())
334 }
335
336 #[instrument(skip_all, fields(task_id = %task_id))]
337 async fn execute_assertion_task(
338 task_id: &str,
339 context: &ExecutionContext,
340 profile: &GenAIEvalProfile,
341 ) -> Result<(), EvaluationError> {
342 let start_time = Utc::now();
343
344 let task = profile
345 .get_assertion_by_id(task_id)
346 .ok_or_else(|| EvaluationError::TaskNotFound(task_id.to_string()))?;
347
348 let scoped_context = context.build_scoped_context(&task.depends_on).await;
349 let result = task.execute(&scoped_context)?;
350
351 let end_time = Utc::now();
352 context
353 .store_assertion(task_id.to_string(), start_time, end_time, result)
354 .await;
355 Ok(())
356 }
357
358 #[instrument(skip_all, fields(task_id = %task_id))]
359 async fn execute_llm_judge_task(
360 task_id: &str,
361 context: &ExecutionContext,
362 profile: &GenAIEvalProfile,
363 ) -> Result<(String, DateTime<Utc>, serde_json::Value), EvaluationError> {
364 debug!("Starting LLM judge task: {}", task_id);
365 let start_time = Utc::now();
366 let judge = profile
367 .get_llm_judge_by_id(task_id)
368 .ok_or_else(|| EvaluationError::TaskNotFound(task_id.to_string()))?;
369
370 debug!("Building scoped context for: {}", task_id);
371 let scoped_context = context.build_scoped_context(&judge.depends_on).await;
372
373 let workflow = profile.workflow.as_ref().ok_or_else(|| {
374 EvaluationError::GenAIEvaluatorError("No workflow defined".to_string())
375 })?;
376
377 debug!("Executing workflow task: {}", task_id);
378
379 let response = workflow
381 .execute_task(task_id, &scoped_context)
382 .await
383 .inspect_err(|e| error!("LLM task {} failed: {:?}", task_id, e))?;
384
385 debug!("Successfully completed LLM judge task: {}", task_id);
386 Ok((task_id.to_string(), start_time, response))
387 }
388
389 async fn process_llm_judge_results(
390 &self,
391 results: HashMap<String, (DateTime<Utc>, Value)>,
392 ) -> Result<(), EvaluationError> {
393 for (task_id, (start_time, response)) in results {
394 if let Some(task) = self.profile.get_llm_judge_by_id(&task_id) {
395 let assertion_result = task.execute(&response)?;
396
397 self.context
398 .store_llm_response(task_id.clone(), response)
399 .await;
400
401 self.context
402 .store_assertion(task_id, start_time, Utc::now(), assertion_result)
403 .await;
404 }
405 }
406 Ok(())
407 }
408}
409
410struct ResultCollector {
411 context: ExecutionContext,
412}
413
414impl ResultCollector {
415 fn new(context: ExecutionContext) -> Self {
416 Self { context }
417 }
418
419 async fn build_eval_set(
420 &self,
421 record: &GenAIEvalRecord,
422 profile: &GenAIEvalProfile,
423 duration_ms: i64,
424 execution_plan: ExecutionPlan,
425 ) -> GenAIEvalSet {
426 let mut passed_count = 0;
427 let mut failed_count = 0;
428 let mut records = Vec::new();
429
430 let assert_store = self.context.assertion_store.read().await;
431
432 for assertion in &profile.assertion_tasks {
433 if let Some((start_time, end_time, result)) = assert_store.retrieve(&assertion.id) {
434 if !assertion.condition {
435 if result.passed {
436 passed_count += 1;
437 } else {
438 failed_count += 1;
439 }
440 }
441
442 let stage = *self.context.task_stages.get(&assertion.id).unwrap_or(&-1);
443
444 records.push(scouter_types::GenAIEvalTaskResult {
445 created_at: chrono::Utc::now(),
446 start_time,
447 end_time,
448 record_uid: record.uid.clone(),
449 entity_id: record.entity_id,
450 task_id: assertion.id.clone(),
451 task_type: assertion.task_type.clone(),
452 passed: result.passed,
453 value: result.to_metric_value(),
454 field_path: assertion.field_path.clone(),
455 expected: result.expected.clone(),
456 actual: result.actual.clone(),
457 message: result.message.clone(),
458 operator: assertion.operator.clone(),
459 entity_uid: String::new(),
460 condition: assertion.condition,
461 stage,
462 });
463 }
464 }
465
466 for judge in &profile.llm_judge_tasks {
467 if let Some((start_time, end_time, result)) = assert_store.retrieve(&judge.id) {
468 if !judge.condition {
469 if result.passed {
470 passed_count += 1;
471 } else {
472 failed_count += 1;
473 }
474 }
475
476 let stage = *self.context.task_stages.get(&judge.id).unwrap_or(&-1);
477
478 records.push(scouter_types::GenAIEvalTaskResult {
479 created_at: chrono::Utc::now(),
480 start_time,
481 end_time,
482 record_uid: record.uid.clone(),
483 entity_id: record.entity_id,
484 task_id: judge.id.clone(),
485 task_type: judge.task_type.clone(),
486 passed: result.passed,
487 value: result.to_metric_value(),
488 field_path: judge.field_path.clone(),
489 expected: judge.expected_value.clone(),
490 actual: result.actual.clone(),
491 message: result.message.clone(),
492 operator: judge.operator.clone(),
493 entity_uid: String::new(),
494 condition: judge.condition,
495 stage,
496 });
497 }
498 }
499
500 let workflow_record = scouter_types::GenAIEvalWorkflowResult {
501 created_at: chrono::Utc::now(),
502 id: record.id,
503 entity_id: record.entity_id,
504 record_uid: record.uid.clone(),
505 total_tasks: passed_count + failed_count,
506 passed_tasks: passed_count,
507 failed_tasks: failed_count,
508 pass_rate: if passed_count + failed_count == 0 {
509 0.0
510 } else {
511 passed_count as f64 / (passed_count + failed_count) as f64
512 },
513 duration_ms,
514 entity_uid: String::new(),
515 execution_plan,
516 };
517
518 GenAIEvalSet::new(records, workflow_record)
519 }
520}
521
522pub struct GenAIEvaluator;
523
524impl GenAIEvaluator {
525 #[instrument(skip_all, fields(record_uid = %record.uid))]
526 pub async fn process_event_record(
527 record: &GenAIEvalRecord,
528 profile: Arc<GenAIEvalProfile>,
529 ) -> Result<GenAIEvalSet, EvaluationError> {
530 let begin = chrono::Utc::now();
531
532 let mut registry = TaskRegistry::new();
533 Self::register_tasks(&mut registry, &profile);
534
535 let execution_plan = profile.get_execution_plan()?;
536 let context = ExecutionContext::new(record.context.clone(), registry, &execution_plan);
537 let executor = TaskExecutor::new(context.clone(), profile.clone());
538
539 debug!(
540 "Starting evaluation for record: {} with {} stages",
541 record.uid,
542 execution_plan.stages.len()
543 );
544
545 for (stage_idx, stage_tasks) in execution_plan.stages.iter().enumerate() {
546 debug!(
547 "Executing stage {} with {} tasks",
548 stage_idx,
549 stage_tasks.len()
550 );
551 executor
552 .execute_level(stage_tasks)
553 .await
554 .inspect_err(|e| error!("Failed to execute stage {}: {:?}", stage_idx, e))?;
555 }
556
557 let end = chrono::Utc::now();
558 let duration_ms = (end - begin).num_milliseconds();
559
560 let collector = ResultCollector::new(context);
561 let eval_set = collector
562 .build_eval_set(record, &profile, duration_ms, execution_plan)
563 .await;
564
565 Ok(eval_set)
566 }
567
568 fn register_tasks(registry: &mut TaskRegistry, profile: &GenAIEvalProfile) {
569 for task in &profile.assertion_tasks {
570 registry.register(task.id.clone(), TaskType::Assertion, task.condition);
571 if !task.depends_on.is_empty() {
572 registry.register_dependencies(task.id.clone(), task.depends_on.clone());
573 }
574 }
575
576 for task in &profile.llm_judge_tasks {
577 registry.register(task.id.clone(), TaskType::LLMJudge, task.condition);
578 if !task.depends_on.is_empty() {
579 registry.register_dependencies(task.id.clone(), task.depends_on.clone());
580 }
581 }
582 }
583}
584
585#[cfg(test)]
586mod tests {
587
588 use chrono::Utc;
589 use potato_head::mock::{create_score_prompt, LLMTestServer};
590 use scouter_types::genai::{
591 AssertionTask, ComparisonOperator, GenAIAlertConfig, GenAIEvalConfig, GenAIEvalProfile,
592 LLMJudgeTask,
593 };
594 use scouter_types::genai::{EvaluationTaskType, EvaluationTasks};
595 use scouter_types::GenAIEvalRecord;
596 use serde_json::Value;
597 use std::sync::Arc;
598
599 use crate::evaluate::GenAIEvaluator;
600
601 async fn create_assert_judge_profile() -> GenAIEvalProfile {
602 let prompt = create_score_prompt(Some(vec!["input".to_string()]));
603
604 let assertion_level_1 = AssertionTask {
605 id: "input_check".to_string(),
606 field_path: Some("input.foo".to_string()),
607 operator: ComparisonOperator::Equals,
608 expected_value: Value::String("bar".to_string()),
609 description: Some("Check if input.foo is bar".to_string()),
610 task_type: EvaluationTaskType::Assertion,
611 depends_on: vec![],
612 result: None,
613 condition: false,
614 };
615
616 let judge_task_level_1 = LLMJudgeTask::new_rs(
617 "query_relevance",
618 prompt.clone(),
619 Value::Number(1.into()),
620 Some("score".to_string()),
621 ComparisonOperator::GreaterThanOrEqual,
622 None,
623 None,
624 None,
625 None,
626 );
627
628 let assert_query_score = AssertionTask {
629 id: "assert_score".to_string(),
630 field_path: Some("query_relevance.score".to_string()),
631 operator: ComparisonOperator::IsNumeric,
632 expected_value: Value::Bool(true),
633 depends_on: vec!["query_relevance".to_string()],
634 task_type: EvaluationTaskType::Assertion,
635 description: Some("Check that score is numeric".to_string()),
636 result: None,
637 condition: false,
638 };
639
640 let assert_query_reason = AssertionTask {
641 id: "assert_reason".to_string(),
642 field_path: Some("query_relevance.reason".to_string()),
643 operator: ComparisonOperator::IsString,
644 expected_value: Value::Bool(true),
645 depends_on: vec!["query_relevance".to_string()],
646 task_type: EvaluationTaskType::Assertion,
647 description: Some("Check that reason is alphabetic".to_string()),
648 result: None,
649 condition: false,
650 };
651
652 let tasks = EvaluationTasks::new()
653 .add_task(assertion_level_1)
654 .add_task(judge_task_level_1)
655 .add_task(assert_query_score)
656 .add_task(assert_query_reason)
657 .build();
658
659 let alert_config = GenAIAlertConfig::default();
660
661 let drift_config =
662 GenAIEvalConfig::new("scouter", "ML", "0.1.0", 1.0, alert_config, None).unwrap();
663
664 GenAIEvalProfile::new(drift_config, tasks).await.unwrap()
665 }
666
667 async fn create_assert_profile() -> GenAIEvalProfile {
668 let assert1 = AssertionTask {
669 id: "input_foo_check".to_string(),
670 field_path: Some("input.foo".to_string()),
671 operator: ComparisonOperator::Equals,
672 expected_value: Value::String("bar".to_string()),
673 description: Some("Check if input.foo is bar".to_string()),
674 task_type: EvaluationTaskType::Assertion,
675 depends_on: vec![],
676 result: None,
677 condition: false,
678 };
679 let assert2 = AssertionTask {
680 id: "input_bar_check".to_string(),
681 field_path: Some("input.bar".to_string()),
682 operator: ComparisonOperator::IsNumeric,
683 expected_value: Value::Bool(true),
684 depends_on: vec![],
685 task_type: EvaluationTaskType::Assertion,
686 description: Some("Check that bar is numeric".to_string()),
687 result: None,
688 condition: false,
689 };
690
691 let assert3 = AssertionTask {
692 id: "input_baz_check".to_string(),
693 field_path: Some("input.baz".to_string()),
694 operator: ComparisonOperator::HasLengthEqual,
695 expected_value: Value::Number(3.into()),
696 depends_on: vec![],
697 task_type: EvaluationTaskType::Assertion,
698 description: Some("Check that baz has length 3".to_string()),
699 result: None,
700 condition: false,
701 };
702
703 let tasks = EvaluationTasks::new()
704 .add_task(assert1)
705 .add_task(assert2)
706 .add_task(assert3)
707 .build();
708
709 let alert_config = GenAIAlertConfig::default();
710
711 let drift_config =
712 GenAIEvalConfig::new("scouter", "ML", "0.1.0", 1.0, alert_config, None).unwrap();
713
714 GenAIEvalProfile::new(drift_config, tasks).await.unwrap()
715 }
716
717 #[test]
718 fn test_evaluator_assert_judge_all_pass() {
719 let mut mock = LLMTestServer::new();
720 mock.start_server().unwrap();
721 let runtime = tokio::runtime::Runtime::new().unwrap();
722 let profile = runtime.block_on(async { create_assert_judge_profile().await });
723
724 assert!(profile.has_llm_tasks());
725 assert!(profile.has_assertions());
726
727 let context = serde_json::json!({
728 "input": {
729 "foo": "bar" }
730 });
731
732 let record = GenAIEvalRecord::new_rs(
733 context,
734 Utc::now(),
735 "UID123".to_string(),
736 "ENTITY123".to_string(),
737 None,
738 None,
739 );
740
741 let result_set = runtime.block_on(async {
742 GenAIEvaluator::process_event_record(&record, Arc::new(profile)).await
743 });
744
745 let eval_set = result_set.unwrap();
746 assert!(eval_set.passed_tasks() == 4);
747 assert!(eval_set.failed_tasks() == 0);
748
749 mock.stop_server().unwrap();
750 }
751
752 #[test]
753 fn test_evaluator_assert_one_fail() {
754 let mut mock = LLMTestServer::new();
755 mock.start_server().unwrap();
756 let runtime = tokio::runtime::Runtime::new().unwrap();
757 let profile = runtime.block_on(async { create_assert_profile().await });
758
759 assert!(!profile.has_llm_tasks());
760 assert!(profile.has_assertions());
761
762 let context = serde_json::json!({
764 "input": {
765 "foo": "bar",
766 "bar": "not_a_number",
767 "baz": [1, 2, 3]}
768 });
769
770 let record = GenAIEvalRecord::new_rs(
771 context,
772 Utc::now(),
773 "UID123".to_string(),
774 "ENTITY123".to_string(),
775 None,
776 None,
777 );
778
779 let result_set = runtime.block_on(async {
780 GenAIEvaluator::process_event_record(&record, Arc::new(profile)).await
781 });
782
783 let eval_set = result_set.unwrap();
784 assert!(eval_set.passed_tasks() == 2);
785 assert!(eval_set.failed_tasks() == 1);
786
787 mock.stop_server().unwrap();
788 }
789}