1use super::{
7 ChangeType, Comparator, ExecutionConfig, FieldChange, PolicyViolation, ReplayError, ReplayMode,
8 ReplayPolicy, ReplayResult, ReplayStats, ReplayStatus, SnapshotDiff, SyncModelExecutor,
9};
10use crate::models::DecisionSnapshot;
11use crate::storage::sync::SyncStorageBackend;
12use std::sync::Arc;
13use std::time::Instant;
14
15pub struct SyncReplayEngine<S: SyncStorageBackend> {
17 storage: S,
18 default_mode: ReplayMode,
19 executor: Option<Arc<dyn SyncModelExecutor>>,
20}
21
22impl<S: SyncStorageBackend> SyncReplayEngine<S> {
23 pub fn new(storage: S) -> Self {
25 Self {
26 storage,
27 default_mode: ReplayMode::Tolerant,
28 executor: None,
29 }
30 }
31
32 pub fn with_mode(storage: S, mode: ReplayMode) -> Self {
34 Self {
35 storage,
36 default_mode: mode,
37 executor: None,
38 }
39 }
40
41 pub fn with_executor(mut self, executor: Arc<dyn SyncModelExecutor>) -> Self {
43 self.executor = Some(executor);
44 self
45 }
46
47 pub fn executor(&self) -> Option<&dyn SyncModelExecutor> {
49 self.executor.as_ref().map(|arc| arc.as_ref())
50 }
51
52 pub fn default_mode(&self) -> ReplayMode {
54 self.default_mode.clone()
55 }
56
57 pub fn replay(
59 &self,
60 snapshot_id: &str,
61 mode: Option<ReplayMode>,
62 _context_overrides: Option<serde_json::Value>,
63 ) -> Result<ReplayResult, ReplayError> {
64 let start_time = Instant::now();
65 let replay_mode = mode.unwrap_or_else(|| self.default_mode());
66
67 let original_snapshot = self
68 .storage
69 .load_decision(snapshot_id)
70 .map_err(|e| ReplayError::StorageError(e.to_string()))?;
71
72 match replay_mode {
73 ReplayMode::ValidationOnly => {
74 Ok(ReplayResult {
76 status: ReplayStatus::Success,
77 original_snapshot,
78 replay_output: None,
79 outputs_match: true, diff: None,
81 policy_violations: Vec::new(),
82 execution_time_ms: start_time.elapsed().as_millis() as f64,
83 })
84 }
85 ReplayMode::Strict | ReplayMode::Tolerant => {
86 if self.executor.is_some() {
88 self.execute_replay(&original_snapshot, replay_mode, start_time)
89 } else {
90 let simulated_output = simulate_execution(&original_snapshot);
92 let outputs_match =
93 compare_outputs(&original_snapshot, &simulated_output, &replay_mode);
94
95 Ok(ReplayResult {
96 status: if outputs_match {
97 ReplayStatus::Success
98 } else {
99 ReplayStatus::Failed
100 },
101 original_snapshot,
102 replay_output: Some(simulated_output),
103 outputs_match,
104 diff: None, policy_violations: Vec::new(),
106 execution_time_ms: start_time.elapsed().as_millis() as f64,
107 })
108 }
109 }
110 }
111 }
112
113 pub fn replay_with_policy(
115 &self,
116 snapshot_id: &str,
117 policy: &ReplayPolicy,
118 mode: Option<ReplayMode>,
119 ) -> Result<ReplayResult, ReplayError> {
120 let mut result = self.replay(snapshot_id, mode, None)?;
121
122 let violations = self.validate(snapshot_id, policy)?;
124 result.policy_violations = violations;
125
126 if !result.policy_violations.is_empty() {
127 result.status = ReplayStatus::Failed;
128 }
129
130 Ok(result)
131 }
132
133 pub fn validate(
135 &self,
136 snapshot_id: &str,
137 policy: &ReplayPolicy,
138 ) -> Result<Vec<PolicyViolation>, ReplayError> {
139 let snapshot = self
140 .storage
141 .load_decision(snapshot_id)
142 .map_err(|e| ReplayError::StorageError(e.to_string()))?;
143
144 let mut violations = Vec::new();
145
146 for rule in &policy.rules {
148 match rule.comparator {
149 Comparator::ExactMatch => {
150 if rule.field == "function_name" {
153 if snapshot.function_name.is_empty() {
155 violations.push(PolicyViolation {
156 rule_name: format!("exact_match_{}", rule.field),
157 field: rule.field.clone(),
158 expected: "non-empty function name".to_string(),
159 actual: "empty".to_string(),
160 message: "Function name cannot be empty".to_string(),
161 });
162 }
163 }
164 }
165 Comparator::SemanticSimilarity => {
166 if rule.field == "output" && snapshot.outputs.is_empty() {
169 violations.push(PolicyViolation {
170 rule_name: format!("similarity_{}", rule.field),
171 field: rule.field.clone(),
172 expected: "at least one output".to_string(),
173 actual: "no outputs".to_string(),
174 message: "At least one output is required".to_string(),
175 });
176 }
177 }
178 Comparator::MaxIncreasePercent => {
179 }
182 Comparator::MaxDecreasePercent => {
183 }
186 Comparator::WithinRange => {
187 }
190 }
191 }
192
193 Ok(violations)
194 }
195
196 fn execute_replay(
198 &self,
199 original: &DecisionSnapshot,
200 mode: ReplayMode,
201 start_time: Instant,
202 ) -> Result<ReplayResult, ReplayError> {
203 if let Some(ref executor) = self.executor {
204 if let Some(ref params) = original.model_parameters {
206 if !executor.supports_model(¶ms.model_name) {
207 return Err(ReplayError::ExecutionError(format!(
208 "Executor '{}' does not support model '{}'",
209 executor.executor_name(),
210 params.model_name
211 )));
212 }
213 }
214
215 let config = ExecutionConfig::default();
217 let exec_result = executor.execute(
218 &original.inputs,
219 original.model_parameters.as_ref(),
220 &original.context,
221 &config,
222 )?;
223
224 let execution_time = start_time.elapsed().as_millis() as f64;
225
226 let tolerance = match mode {
228 ReplayMode::Strict => 1.0, ReplayMode::Tolerant => 0.8, ReplayMode::ValidationOnly => 0.0, };
232
233 let comparison =
234 executor.compare_outputs(&original.outputs, &exec_result.outputs, tolerance);
235
236 let replay_output = serde_json::to_value(&exec_result.outputs).ok();
237
238 Ok(ReplayResult {
239 status: if comparison.is_match {
240 ReplayStatus::Success
241 } else {
242 ReplayStatus::Failed
243 },
244 original_snapshot: original.clone(),
245 replay_output,
246 outputs_match: comparison.is_match,
247 diff: Some(SnapshotDiff {
248 inputs_changed: false,
249 outputs_changed: !comparison.is_match,
250 model_params_changed: false,
251 execution_time_delta_ms: execution_time
252 - original.execution_time_ms.unwrap_or(0.0),
253 changes: comparison
254 .field_comparisons
255 .iter()
256 .filter(|c| !c.is_match)
257 .map(|c| FieldChange {
258 field_path: format!("output.{}", c.field_name),
259 old_value: c.original_value.clone(),
260 new_value: c.replayed_value.clone(),
261 change_type: ChangeType::Modified,
262 })
263 .collect(),
264 }),
265 policy_violations: Vec::new(),
266 execution_time_ms: execution_time,
267 })
268 } else {
269 self.simulate_replay(original, mode, start_time)
271 }
272 }
273
274 pub fn get_replay_stats(&self, snapshot_ids: &[String]) -> Result<ReplayStats, ReplayError> {
276 let mut total_replays = 0;
277 let mut successful_replays = 0;
278 let mut failed_replays = 0;
279 let mut exact_matches = 0;
280 let mut mismatches = 0;
281 let mut total_execution_time_ms = 0.0;
282
283 for snapshot_id in snapshot_ids {
284 match self.replay(snapshot_id, None, None) {
285 Ok(result) => {
286 total_replays += 1;
287 total_execution_time_ms += result.execution_time_ms;
288
289 match result.status {
290 ReplayStatus::Success => {
291 successful_replays += 1;
292 if result.outputs_match {
293 exact_matches += 1;
294 } else {
295 mismatches += 1;
296 }
297 }
298 _ => {
299 failed_replays += 1;
300 mismatches += 1;
301 }
302 }
303 }
304 Err(_) => {
305 total_replays += 1;
306 failed_replays += 1;
307 mismatches += 1;
308 }
309 }
310 }
311
312 let average_execution_time_ms = if total_replays > 0 {
313 total_execution_time_ms / total_replays as f64
314 } else {
315 0.0
316 };
317
318 Ok(ReplayStats {
319 total_replays,
320 successful_replays,
321 failed_replays,
322 exact_matches,
323 mismatches,
324 average_execution_time_ms,
325 total_execution_time_ms,
326 })
327 }
328
329 fn simulate_replay(
330 &self,
331 original: &DecisionSnapshot,
332 mode: ReplayMode,
333 start_time: Instant,
334 ) -> Result<ReplayResult, ReplayError> {
335 let simulated_output = simulate_execution(original);
336 let outputs_match = compare_outputs(original, &simulated_output, &mode);
337
338 Ok(ReplayResult {
339 status: if outputs_match {
340 ReplayStatus::Success
341 } else {
342 ReplayStatus::Failed
343 },
344 original_snapshot: original.clone(),
345 replay_output: Some(simulated_output),
346 outputs_match,
347 diff: None,
348 policy_violations: Vec::new(),
349 execution_time_ms: start_time.elapsed().as_millis() as f64,
350 })
351 }
352}
353
354impl<S: SyncStorageBackend> Clone for SyncReplayEngine<S>
355where
356 S: Clone,
357{
358 fn clone(&self) -> Self {
359 Self {
360 storage: self.storage.clone(),
361 default_mode: self.default_mode.clone(),
362 executor: self.executor.clone(),
363 }
364 }
365}
366
367fn simulate_execution(decision: &DecisionSnapshot) -> serde_json::Value {
370 if let Some(output) = decision.outputs.first() {
372 output.value.clone()
373 } else {
374 serde_json::Value::Null
375 }
376}
377
378fn compare_outputs(
380 decision: &DecisionSnapshot,
381 simulated_output: &serde_json::Value,
382 mode: &ReplayMode,
383) -> bool {
384 if let Some(original_output) = decision.outputs.first() {
385 match mode {
386 ReplayMode::Strict => {
387 original_output.value == *simulated_output
389 }
390 ReplayMode::Tolerant => {
391 if original_output.value == *simulated_output {
393 true
394 } else {
395 false
397 }
398 }
399 ReplayMode::ValidationOnly => {
400 true
402 }
403 }
404 } else {
405 simulated_output.is_null()
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use crate::models::*;
413 use crate::storage::sync::MemoryStorageBackend;
414 use serde_json::json;
415
416 fn create_test_decision() -> DecisionSnapshot {
417 let input = Input::new("test_input", json!("value"), "string");
418 let output = Output::new("test_output", json!("result"), "string");
419 let model_params = ModelParameters::new("gpt-4");
420
421 DecisionSnapshot::new("test_function")
422 .with_module("test_module")
423 .add_input(input)
424 .add_output(output)
425 .with_model_parameters(model_params)
426 .add_tag("env", "test")
427 }
428
429 #[test]
430 fn test_sync_replay_validation_only() {
431 let storage = MemoryStorageBackend::new();
432 let engine = SyncReplayEngine::new(storage);
433
434 let decision = create_test_decision();
435 let decision_id = engine.storage.save_decision(&decision).unwrap();
436
437 let result = engine
438 .replay(&decision_id, Some(ReplayMode::ValidationOnly), None)
439 .unwrap();
440
441 assert_eq!(result.status, ReplayStatus::Success);
442 assert!(result.outputs_match);
443 assert!(result.replay_output.is_none());
444 }
445
446 #[test]
447 fn test_sync_replay_tolerant_mode() {
448 let storage = MemoryStorageBackend::new();
449 let engine = SyncReplayEngine::new(storage);
450
451 let decision = create_test_decision();
452 let decision_id = engine.storage.save_decision(&decision).unwrap();
453
454 let result = engine
455 .replay(&decision_id, Some(ReplayMode::Tolerant), None)
456 .unwrap();
457
458 assert_eq!(result.status, ReplayStatus::Success);
459 assert!(result.replay_output.is_some());
460 }
461
462 #[test]
463 fn test_sync_replay_stats() {
464 let storage = MemoryStorageBackend::new();
465 let engine = SyncReplayEngine::new(storage);
466
467 let decision1 = create_test_decision();
468 let decision2 = create_test_decision();
469
470 let id1 = engine.storage.save_decision(&decision1).unwrap();
471 let id2 = engine.storage.save_decision(&decision2).unwrap();
472
473 let stats = engine.get_replay_stats(&[id1, id2]).unwrap();
474
475 assert_eq!(stats.total_replays, 2);
476 assert!(stats.total_execution_time_ms >= 0.0);
477 assert!(stats.average_execution_time_ms >= 0.0);
478 }
479
480 #[test]
481 fn test_sync_replay_policy_validation() {
482 let storage = MemoryStorageBackend::new();
483 let engine = SyncReplayEngine::new(storage);
484
485 let policy = ReplayPolicy::new("test_policy".to_string())
486 .with_exact_match("function_name".to_string());
487
488 let decision = create_test_decision();
489 let decision_id = engine.storage.save_decision(&decision).unwrap();
490
491 let violations = engine.validate(&decision_id, &policy).unwrap();
492
493 assert!(violations.is_empty());
495 }
496}