1use std::collections::HashSet;
23use std::sync::Arc;
24use std::time::Instant;
25
26use serde::{Deserialize, Serialize};
27use tokio_util::sync::CancellationToken;
28use zeph_common::SessionId;
29use zeph_common::timestamp;
30use zeph_llm::any::AnyProvider;
31use zeph_memory::semantic::SemanticMemory;
32use zeph_memory::store::experiments::NewExperimentResult;
33
34use super::error::EvalError;
35use super::evaluator::Evaluator;
36use super::generator::VariationGenerator;
37use super::snapshot::ConfigSnapshot;
38use super::types::{ExperimentResult, ExperimentSource, Variation};
39use zeph_config::ExperimentConfig;
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ExperimentSessionReport {
50 pub session_id: SessionId,
52 pub results: Vec<ExperimentResult>,
54 pub best_config: ConfigSnapshot,
56 pub baseline_score: f64,
60 pub final_score: f64,
64 pub total_improvement: f64,
66 pub wall_time_ms: u64,
68 pub cancelled: bool,
73}
74
75pub struct ExperimentEngine {
96 evaluator: Evaluator,
97 generator: Box<dyn VariationGenerator>,
98 subject: Arc<AnyProvider>,
99 baseline: ConfigSnapshot,
100 config: ExperimentConfig,
101 memory: Option<Arc<SemanticMemory>>,
102 session_id: SessionId,
103 cancel: CancellationToken,
104 source: ExperimentSource,
105}
106
107const MAX_CONSECUTIVE_NAN: u32 = 3;
110
111impl ExperimentEngine {
112 pub fn new(
125 evaluator: Evaluator,
126 generator: Box<dyn VariationGenerator>,
127 subject: Arc<AnyProvider>,
128 baseline: ConfigSnapshot,
129 config: ExperimentConfig,
130 memory: Option<Arc<SemanticMemory>>,
131 ) -> Self {
132 Self {
133 evaluator,
134 generator,
135 subject,
136 baseline,
137 config,
138 memory,
139 session_id: SessionId::generate(),
140 cancel: CancellationToken::new(),
141 source: ExperimentSource::Manual,
142 }
143 }
144
145 #[must_use]
150 pub fn with_source(mut self, source: ExperimentSource) -> Self {
151 self.source = source;
152 self
153 }
154
155 #[must_use]
160 pub fn cancel_token(&self) -> CancellationToken {
161 self.cancel.clone()
162 }
163
164 pub fn stop(&self) {
168 self.cancel.cancel();
169 }
170
171 #[tracing::instrument(
192 name = "experiments.engine.run",
193 skip(self),
194 fields(session_id = %self.session_id, source = %self.source)
195 )]
196 pub async fn run(&mut self) -> Result<ExperimentSessionReport, EvalError> {
197 let start = Instant::now();
198 let best_snapshot = self.baseline.clone();
199
200 let baseline_report = tokio::select! {
203 biased;
204 () = self.cancel.cancelled() => {
205 tracing::info!(session_id = %self.session_id, "cancelled before baseline");
206 #[allow(clippy::cast_possible_truncation)]
207 return Ok(ExperimentSessionReport {
208 session_id: self.session_id.clone(),
209 results: vec![],
210 best_config: best_snapshot,
211 baseline_score: f64::NAN,
212 final_score: f64::NAN,
213 total_improvement: 0.0,
214 wall_time_ms: start.elapsed().as_millis() as u64,
215 cancelled: true,
216 });
217 }
218 report = self.evaluator.evaluate(&self.subject) => report?,
219 };
220
221 let initial_baseline_score = baseline_report.mean_score;
223 if initial_baseline_score.is_nan() {
224 return Err(EvalError::Storage(
225 "baseline evaluation produced NaN mean score; \
226 check evaluator budget and judge responses"
227 .into(),
228 ));
229 }
230 tracing::info!(
231 session_id = %self.session_id,
232 baseline_score = initial_baseline_score,
233 "experiment session started"
234 );
235 self.run_loop(start, initial_baseline_score, best_snapshot)
236 .await
237 }
238
239 #[allow(clippy::too_many_lines)] #[tracing::instrument(
246 name = "experiments.engine.run_loop",
247 skip(self, start, best_snapshot),
248 fields(session_id = %self.session_id, source = %self.source)
249 )]
250 async fn run_loop(
251 &mut self,
252 start: Instant,
253 initial_baseline_score: f64,
254 mut best_snapshot: ConfigSnapshot,
255 ) -> Result<ExperimentSessionReport, EvalError> {
256 let wall_limit = std::time::Duration::from_secs(self.config.max_wall_time_secs);
257 let mut results: Vec<ExperimentResult> = Vec::new();
258 let mut visited: HashSet<Variation> = HashSet::new();
259 let (mut best_score, mut consecutive_nan) = (initial_baseline_score, 0u32);
260
261 'main: loop {
262 if results.len() >= self.config.max_experiments as usize {
263 tracing::info!(session_id = %self.session_id, "budget exhausted");
264 break;
265 }
266 if start.elapsed() >= wall_limit {
267 tracing::info!(session_id = %self.session_id, "wall-time limit reached");
268 break;
269 }
270 let Some(variation) = self.generator.next(&best_snapshot, &visited) else {
271 tracing::info!(session_id = %self.session_id, "search space exhausted");
272 break;
273 };
274 visited.insert(variation.clone());
275 let candidate_snapshot = best_snapshot.apply(&variation);
276 let patched = (*self.subject)
277 .clone()
278 .with_generation_overrides(candidate_snapshot.to_generation_overrides());
279 let candidate_report = tokio::select! {
280 biased;
281 () = self.cancel.cancelled() => {
282 tracing::info!(session_id = %self.session_id, "experiment cancelled");
283 break 'main;
284 }
285 report = self.evaluator.evaluate(&patched) => report?,
286 };
287 if candidate_report.mean_score.is_nan() {
288 consecutive_nan += 1;
289 tracing::warn!(
290 session_id = %self.session_id, param = %variation.parameter,
291 is_partial = candidate_report.is_partial, consecutive_nan,
292 "NaN mean score — skipping variation"
293 );
294 if consecutive_nan >= MAX_CONSECUTIVE_NAN {
295 tracing::warn!(session_id = %self.session_id, "consecutive NaN cap reached");
296 break;
297 }
298 continue;
299 }
300 consecutive_nan = 0;
301 let candidate_score = candidate_report.mean_score;
302 let delta = candidate_score - best_score;
303 let accepted = delta >= self.config.min_improvement;
304 let result_id = self
305 .persist_result(
306 &variation,
307 best_score,
308 candidate_score,
309 delta,
310 accepted,
311 candidate_report.p50_latency_ms,
312 candidate_report.total_tokens,
313 )
314 .await?;
315 let pre_accept_baseline = best_score;
316 self.log_outcome(&variation, delta, accepted, best_score);
317 if accepted {
318 best_snapshot = candidate_snapshot;
319 best_score = candidate_score;
320 }
321 results.push(ExperimentResult {
322 id: result_id,
323 session_id: self.session_id.clone(),
324 variation,
325 baseline_score: pre_accept_baseline,
326 candidate_score,
327 delta,
328 latency_ms: candidate_report.p50_latency_ms,
329 tokens_used: candidate_report.total_tokens,
330 accepted,
331 source: self.source.clone(),
332 created_at: timestamp::utc_now_rfc3339(),
333 });
334 }
335
336 #[allow(clippy::cast_possible_truncation)]
337 let wall_time_ms = start.elapsed().as_millis() as u64;
338 let total_improvement = best_score - initial_baseline_score;
339 tracing::info!(
340 session_id = %self.session_id, total = results.len(),
341 baseline_score = initial_baseline_score, final_score = best_score,
342 total_improvement, wall_time_ms, cancelled = self.cancel.is_cancelled(),
343 "experiment session complete"
344 );
345 Ok(ExperimentSessionReport {
346 session_id: self.session_id.clone(),
347 results,
348 best_config: best_snapshot,
349 baseline_score: initial_baseline_score,
350 final_score: best_score,
351 total_improvement,
352 wall_time_ms,
353 cancelled: self.cancel.is_cancelled(),
354 })
355 }
356
357 #[allow(clippy::too_many_arguments)] async fn persist_result(
367 &self,
368 variation: &Variation,
369 baseline_score: f64,
370 candidate_score: f64,
371 delta: f64,
372 accepted: bool,
373 p50_latency_ms: u64,
374 total_tokens: u64,
375 ) -> Result<Option<i64>, EvalError> {
376 let Some(mem) = &self.memory else {
377 return Ok(None);
378 };
379 let value_json = serde_json::to_string(&variation.value)
380 .map_err(|e| EvalError::Storage(e.to_string()))?;
381 #[allow(clippy::cast_possible_wrap)]
382 let new_result = NewExperimentResult {
383 session_id: self.session_id.as_str(),
384 parameter: variation.parameter.as_str(),
385 value_json: &value_json,
386 baseline_score,
387 candidate_score,
388 delta,
389 latency_ms: p50_latency_ms as i64,
390 tokens_used: total_tokens as i64,
391 accepted,
392 source: self.source.as_str(),
393 };
394 mem.sqlite()
395 .insert_experiment_result(&new_result)
396 .await
397 .map(Some)
398 .map_err(|e: zeph_memory::error::MemoryError| EvalError::Storage(e.to_string()))
399 }
400
401 fn log_outcome(&self, variation: &Variation, delta: f64, accepted: bool, new_score: f64) {
402 if accepted {
403 tracing::info!(
404 session_id = %self.session_id,
405 param = %variation.parameter,
406 value = %variation.value,
407 delta,
408 new_best_score = new_score,
409 "variation accepted — new baseline"
410 );
411 } else {
412 tracing::info!(
413 session_id = %self.session_id,
414 param = %variation.parameter,
415 value = %variation.value,
416 delta,
417 "variation rejected"
418 );
419 }
420 }
421}
422
423#[cfg(test)]
424mod tests {
425 #![allow(clippy::doc_markdown)]
426
427 use super::*;
428 use crate::benchmark::{BenchmarkCase, BenchmarkSet};
429 use crate::evaluator::Evaluator;
430 use crate::generator::VariationGenerator;
431 use crate::snapshot::ConfigSnapshot;
432 use crate::types::{ParameterKind, Variation, VariationValue};
433 use ordered_float::OrderedFloat;
434 use std::sync::Arc;
435 use zeph_config::ExperimentConfig;
436
437 fn make_benchmark() -> BenchmarkSet {
438 BenchmarkSet {
439 cases: vec![BenchmarkCase {
440 prompt: "What is 2+2?".into(),
441 context: None,
442 reference: None,
443 tags: None,
444 }],
445 }
446 }
447
448 fn default_config() -> ExperimentConfig {
449 ExperimentConfig {
450 max_experiments: 10,
451 max_wall_time_secs: 3600,
452 min_improvement: 0.0,
453 ..Default::default()
454 }
455 }
456
457 struct NVariationGenerator {
459 variations: Vec<Variation>,
460 pos: usize,
461 }
462
463 impl NVariationGenerator {
464 fn new(n: usize) -> Self {
465 let variations = (0..n)
466 .map(|i| Variation {
467 parameter: ParameterKind::Temperature,
468 #[allow(clippy::cast_precision_loss)]
469 value: VariationValue::Float(OrderedFloat(0.5 + i as f64 * 0.1)),
470 })
471 .collect();
472 Self { variations, pos: 0 }
473 }
474 }
475
476 impl VariationGenerator for NVariationGenerator {
477 fn next(
478 &mut self,
479 _baseline: &ConfigSnapshot,
480 visited: &HashSet<Variation>,
481 ) -> Option<Variation> {
482 while self.pos < self.variations.len() {
483 let v = self.variations[self.pos].clone();
484 self.pos += 1;
485 if !visited.contains(&v) {
486 return Some(v);
487 }
488 }
489 None
490 }
491
492 fn name(&self) -> &'static str {
493 "n_variation"
494 }
495 }
496
497 #[cfg(test)]
498 fn make_subject_mock(n_responses: usize) -> zeph_llm::any::AnyProvider {
499 use zeph_llm::any::AnyProvider;
500 use zeph_llm::mock::MockProvider;
501
502 let responses: Vec<String> = (0..n_responses).map(|_| "Four".to_string()).collect();
506 AnyProvider::Mock(MockProvider::with_responses(responses))
507 }
508
509 #[cfg(test)]
510 fn make_judge_mock(n_responses: usize) -> zeph_llm::any::AnyProvider {
511 use zeph_llm::any::AnyProvider;
512 use zeph_llm::mock::MockProvider;
513
514 let responses: Vec<String> = (0..n_responses)
515 .map(|_| r#"{"score": 8.0, "reason": "correct"}"#.to_string())
516 .collect();
517 AnyProvider::Mock(MockProvider::with_responses(responses))
518 }
519
520 #[cfg(test)]
521 #[tokio::test]
522 async fn engine_completes_with_no_accepted_variations() {
523 let config = ExperimentConfig {
525 max_experiments: 10,
526 max_wall_time_secs: 3600,
527 min_improvement: 100.0,
528 ..Default::default()
529 };
530 let subject = make_subject_mock(2);
532 let judge = make_judge_mock(2);
533 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
534
535 let mut engine = ExperimentEngine::new(
536 evaluator,
537 Box::new(NVariationGenerator::new(1)),
538 Arc::new(subject),
539 ConfigSnapshot::default(),
540 config,
541 None,
542 );
543
544 let report = engine.run().await.unwrap();
545 assert_eq!(report.results.len(), 1);
546 assert!(!report.results[0].accepted);
547 assert!(!report.session_id.is_empty());
548 assert!(!report.cancelled);
549 }
550
551 #[cfg(test)]
552 #[tokio::test]
553 async fn engine_respects_max_experiments() {
554 let config = ExperimentConfig {
555 max_experiments: 3,
556 max_wall_time_secs: 3600,
557 min_improvement: 0.0,
558 ..Default::default()
559 };
560 let subject = make_subject_mock(4);
563 let judge = make_judge_mock(4);
564 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
565
566 let mut engine = ExperimentEngine::new(
567 evaluator,
568 Box::new(NVariationGenerator::new(5)),
569 Arc::new(subject),
570 ConfigSnapshot::default(),
571 config,
572 None,
573 );
574
575 let report = engine.run().await.unwrap();
576 assert_eq!(report.results.len(), 3);
577 assert!(!report.cancelled);
578 }
579
580 #[cfg(test)]
581 #[tokio::test]
582 async fn engine_cancellation_before_baseline() {
583 let config = ExperimentConfig {
585 max_experiments: 100,
586 max_wall_time_secs: 3600,
587 min_improvement: 0.0,
588 ..Default::default()
589 };
590 let subject = make_subject_mock(2);
591 let judge = make_judge_mock(2);
592 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
593
594 let mut engine = ExperimentEngine::new(
595 evaluator,
596 Box::new(NVariationGenerator::new(100)),
597 Arc::new(subject),
598 ConfigSnapshot::default(),
599 config,
600 None,
601 );
602 engine.stop(); let report = engine.run().await.unwrap();
604 assert!(report.cancelled);
605 assert!(report.results.is_empty());
606 }
607
608 #[cfg(test)]
609 #[tokio::test]
610 async fn engine_cancellation_stops_loop() {
611 let config = ExperimentConfig {
620 max_experiments: 10,
621 max_wall_time_secs: 3600,
622 min_improvement: 0.0,
623 ..Default::default()
624 };
625 let subject = make_subject_mock(2);
626 let judge = make_judge_mock(2);
627 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
628
629 let mut engine = ExperimentEngine::new(
630 evaluator,
631 Box::new(NVariationGenerator::new(10)),
632 Arc::new(subject),
633 ConfigSnapshot::default(),
634 config,
635 None,
636 );
637
638 let external_token = engine.cancel_token();
640 assert!(!external_token.is_cancelled());
641 engine.stop();
642 assert!(
643 external_token.is_cancelled(),
644 "cancel_token() must share the same token"
645 );
646
647 let report = engine.run().await.unwrap();
648 assert!(report.cancelled);
649 }
650
651 #[cfg(test)]
652 #[tokio::test]
653 async fn engine_progressive_baseline_updates() {
654 let config = ExperimentConfig {
657 max_experiments: 1,
658 max_wall_time_secs: 3600,
659 min_improvement: 0.0,
660 ..Default::default()
661 };
662 let subject = make_subject_mock(2);
664 let judge = make_judge_mock(2);
665 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
666
667 let initial_baseline = ConfigSnapshot::default();
668 let mut engine = ExperimentEngine::new(
669 evaluator,
670 Box::new(NVariationGenerator::new(1)),
671 Arc::new(subject),
672 initial_baseline.clone(),
673 config,
674 None,
675 );
676
677 let report = engine.run().await.unwrap();
678 assert_eq!(report.results.len(), 1);
679 assert!(report.results[0].accepted, "variation should be accepted");
680 assert!(
682 (report.best_config.temperature - initial_baseline.temperature).abs() > 1e-9,
683 "best_config.temperature should have changed after accepted variation"
684 );
685 assert!(!report.baseline_score.is_nan());
686 assert!(!report.final_score.is_nan());
687 assert!(
689 (report.results[0].baseline_score - report.baseline_score).abs() < 1e-9,
690 "result.baseline_score must equal initial baseline_score (pre-acceptance)"
691 );
692 }
693
694 #[cfg(test)]
695 #[tokio::test]
696 async fn engine_handles_search_space_exhaustion() {
697 let config = default_config();
698 let subject = make_subject_mock(1);
701 let judge = make_judge_mock(1);
702 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
703
704 let mut engine = ExperimentEngine::new(
705 evaluator,
706 Box::new(NVariationGenerator::new(0)),
707 Arc::new(subject),
708 ConfigSnapshot::default(),
709 config,
710 None,
711 );
712
713 let report = engine.run().await.unwrap();
714 assert!(report.results.is_empty());
715 assert!(!report.cancelled);
716 }
717
718 #[cfg(test)]
719 #[tokio::test]
720 async fn engine_skips_nan_scores() {
721 use zeph_llm::any::AnyProvider;
722 use zeph_llm::mock::MockProvider;
723
724 let config = ExperimentConfig {
730 max_experiments: 5,
731 max_wall_time_secs: 3600,
732 min_improvement: 0.0,
733 ..Default::default()
734 };
735 let subject = AnyProvider::Mock(MockProvider::with_responses(vec![
737 "A".into(),
738 "A".into(),
739 "A".into(),
740 "A".into(),
741 ]));
742 let judge = AnyProvider::Mock(MockProvider::with_responses(vec![
744 r#"{"score": 8.0, "reason": "ok"}"#.into(),
745 ]));
746 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
748
749 let mut engine = ExperimentEngine::new(
750 evaluator,
751 Box::new(NVariationGenerator::new(5)),
752 Arc::new(subject),
753 ConfigSnapshot::default(),
754 config,
755 None,
756 );
757
758 let report = engine.run().await.unwrap();
760 assert!(
762 report.results.is_empty(),
763 "all NaN iterations should be skipped"
764 );
765 assert!(!report.cancelled);
766 }
767
768 #[cfg(test)]
769 #[tokio::test]
770 async fn engine_nan_baseline_returns_error() {
771 use zeph_llm::any::AnyProvider;
772 use zeph_llm::mock::MockProvider;
773
774 let config = ExperimentConfig {
776 max_experiments: 5,
777 max_wall_time_secs: 3600,
778 min_improvement: 0.0,
779 ..Default::default()
780 };
781 let subject = AnyProvider::Mock(MockProvider::with_responses(vec!["A".into()]));
783 let judge = AnyProvider::Mock(MockProvider::with_responses(vec![]));
785 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
786
787 let mut engine = ExperimentEngine::new(
788 evaluator,
789 Box::new(NVariationGenerator::new(5)),
790 Arc::new(subject),
791 ConfigSnapshot::default(),
792 config,
793 None,
794 );
795
796 let result = engine.run().await;
797 assert!(result.is_err(), "NaN baseline should return an error");
798 let err = result.unwrap_err();
799 assert!(
800 matches!(err, EvalError::Storage(_)),
801 "expected EvalError::Storage, got: {err:?}"
802 );
803 }
804
805 #[cfg(test)]
806 #[tokio::test]
807 async fn engine_persists_results_to_sqlite() {
808 use zeph_memory::testing::mock_semantic_memory;
809
810 let memory = mock_semantic_memory().await.unwrap();
811 let config = ExperimentConfig {
812 max_experiments: 1,
813 max_wall_time_secs: 3600,
814 min_improvement: 0.0,
815 ..Default::default()
816 };
817 let subject = make_subject_mock(2);
819 let judge = make_judge_mock(2);
820 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
821
822 let session_id = {
823 let mut engine = ExperimentEngine::new(
824 evaluator,
825 Box::new(NVariationGenerator::new(1)),
826 Arc::new(subject),
827 ConfigSnapshot::default(),
828 config,
829 Some(Arc::clone(&memory)),
830 );
831 engine.run().await.unwrap();
832 engine.session_id.clone()
833 };
834
835 let rows = memory
836 .sqlite()
837 .list_experiment_results(Some(&session_id), 10)
838 .await
839 .unwrap();
840 assert_eq!(rows.len(), 1, "expected one persisted result");
841 assert_eq!(rows[0].session_id, session_id.as_str());
842 }
843
844 #[test]
845 fn session_report_serde_roundtrip() {
846 let report = ExperimentSessionReport {
847 session_id: SessionId::new("test-session"),
848 results: vec![],
849 best_config: ConfigSnapshot::default(),
850 baseline_score: 7.5,
851 final_score: 8.0,
852 total_improvement: 0.5,
853 wall_time_ms: 1_234,
854 cancelled: false,
855 };
856 let json = serde_json::to_string(&report).expect("serialize");
857 let report2: ExperimentSessionReport = serde_json::from_str(&json).expect("deserialize");
858 assert_eq!(report2.session_id, report.session_id);
859 assert!((report2.baseline_score - report.baseline_score).abs() < f64::EPSILON);
860 assert!((report2.final_score - report.final_score).abs() < f64::EPSILON);
861 assert_eq!(report2.wall_time_ms, report.wall_time_ms);
862 assert!(!report2.cancelled);
863 }
864
865 #[test]
866 fn utc_now_rfc3339_format() {
867 let s = timestamp::utc_now_rfc3339();
868 assert_eq!(s.len(), 20, "timestamp must be 20 chars (RFC 3339): {s}");
869 assert_eq!(&s[4..5], "-");
870 assert_eq!(&s[7..8], "-");
871 assert_eq!(&s[10..11], "T");
872 assert_eq!(&s[13..14], ":");
873 assert_eq!(&s[16..17], ":");
874 assert!(s.ends_with('Z'));
875 }
876
877 #[test]
879 fn utc_now_rfc3339_is_non_empty() {
880 let ts = timestamp::utc_now_rfc3339();
881 assert!(!ts.is_empty());
882 assert_eq!(ts.len(), 20);
883 }
884
885 #[tokio::test]
886 async fn experiment_result_created_at_is_rfc3339() {
887 let config = ExperimentConfig {
888 max_experiments: 1,
889 max_wall_time_secs: 3600,
890 min_improvement: 0.0,
891 ..Default::default()
892 };
893 let subject = make_subject_mock(2);
894 let judge = make_judge_mock(2);
895 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
896
897 let mut engine = ExperimentEngine::new(
898 evaluator,
899 Box::new(NVariationGenerator::new(1)),
900 Arc::new(subject),
901 ConfigSnapshot::default(),
902 config,
903 None,
904 );
905
906 let report = engine.run().await.unwrap();
907 assert_eq!(report.results.len(), 1);
908 let created_at = &report.results[0].created_at;
909 assert!(!created_at.is_empty(), "created_at must not be empty");
910 assert_eq!(
911 created_at.len(),
912 20,
913 "RFC 3339 timestamp must be 20 chars: {created_at}"
914 );
915 assert!(
916 created_at.contains('T'),
917 "RFC 3339 timestamp must contain 'T': {created_at}"
918 );
919 assert!(
920 created_at.ends_with('Z'),
921 "RFC 3339 timestamp must end with 'Z': {created_at}"
922 );
923 }
924
925 #[test]
927 fn experiment_engine_is_send() {
928 fn assert_send<T: Send>() {}
929 let _ = assert_send::<ExperimentEngine>;
932 }
933
934 #[tokio::test]
935 async fn engine_with_source_scheduled_propagates_to_results() {
936 let config = ExperimentConfig {
937 max_experiments: 1,
938 max_wall_time_secs: 3600,
939 min_improvement: 0.0,
940 ..Default::default()
941 };
942 let subject = make_subject_mock(2);
943 let judge = make_judge_mock(2);
944 let evaluator = Evaluator::new(Arc::new(judge), make_benchmark(), 1_000_000).unwrap();
945
946 let mut engine = ExperimentEngine::new(
947 evaluator,
948 Box::new(NVariationGenerator::new(1)),
949 Arc::new(subject),
950 ConfigSnapshot::default(),
951 config,
952 None,
953 )
954 .with_source(ExperimentSource::Scheduled);
955
956 let report = engine.run().await.unwrap();
957 assert_eq!(report.results.len(), 1);
958 assert_eq!(
959 report.results[0].source,
960 ExperimentSource::Scheduled,
961 "with_source(Scheduled) must propagate to ExperimentResult"
962 );
963 }
964}