Skip to main content

briefcase_core/replay/
sync.rs

1//! Synchronous replay engine implementation
2//!
3//! This module provides a synchronous variant of the replay engine
4//! for use cases that don't require async functionality.
5
6use super::{
7    ChangeType, Comparator, ExecutionConfig, FieldChange, PolicyViolation, ReplayError, ReplayMode,
8    ReplayPolicy, ReplayResult, ReplayStats, ReplayStatus, SnapshotDiff, SyncModelExecutor,
9};
10use crate::models::DecisionSnapshot;
11use crate::storage::sync::SyncStorageBackend;
12use std::sync::Arc;
13use std::time::Instant;
14
15/// Synchronous replay engine
16pub struct SyncReplayEngine<S: SyncStorageBackend> {
17    storage: S,
18    default_mode: ReplayMode,
19    executor: Option<Arc<dyn SyncModelExecutor>>,
20}
21
22impl<S: SyncStorageBackend> SyncReplayEngine<S> {
23    /// Create a new synchronous replay engine
24    pub fn new(storage: S) -> Self {
25        Self {
26            storage,
27            default_mode: ReplayMode::Tolerant,
28            executor: None,
29        }
30    }
31
32    /// Create a replay engine with a specific default mode
33    pub fn with_mode(storage: S, mode: ReplayMode) -> Self {
34        Self {
35            storage,
36            default_mode: mode,
37            executor: None,
38        }
39    }
40
41    /// Set a model executor for the replay engine
42    pub fn with_executor(mut self, executor: Arc<dyn SyncModelExecutor>) -> Self {
43        self.executor = Some(executor);
44        self
45    }
46
47    /// Get a reference to the current executor, if any
48    pub fn executor(&self) -> Option<&dyn SyncModelExecutor> {
49        self.executor.as_ref().map(|arc| arc.as_ref())
50    }
51
52    /// Get the default replay mode
53    pub fn default_mode(&self) -> ReplayMode {
54        self.default_mode.clone()
55    }
56
57    /// Replay a snapshot by ID
58    pub fn replay(
59        &self,
60        snapshot_id: &str,
61        mode: Option<ReplayMode>,
62        _context_overrides: Option<serde_json::Value>,
63    ) -> Result<ReplayResult, ReplayError> {
64        let start_time = Instant::now();
65        let replay_mode = mode.unwrap_or_else(|| self.default_mode());
66
67        let original_snapshot = self
68            .storage
69            .load_decision(snapshot_id)
70            .map_err(|e| ReplayError::StorageError(e.to_string()))?;
71
72        match replay_mode {
73            ReplayMode::ValidationOnly => {
74                // Just validate without re-executing
75                Ok(ReplayResult {
76                    status: ReplayStatus::Success,
77                    original_snapshot,
78                    replay_output: None,
79                    outputs_match: true, // Assume match for validation-only
80                    diff: None,
81                    policy_violations: Vec::new(),
82                    execution_time_ms: start_time.elapsed().as_millis() as f64,
83                })
84            }
85            ReplayMode::Strict | ReplayMode::Tolerant => {
86                // Use the executor if available, otherwise fall back to simulation
87                if self.executor.is_some() {
88                    self.execute_replay(&original_snapshot, replay_mode, start_time)
89                } else {
90                    // For sync implementation, we'll simulate replay
91                    let simulated_output = simulate_execution(&original_snapshot);
92                    let outputs_match =
93                        compare_outputs(&original_snapshot, &simulated_output, &replay_mode);
94
95                    Ok(ReplayResult {
96                        status: if outputs_match {
97                            ReplayStatus::Success
98                        } else {
99                            ReplayStatus::Failed
100                        },
101                        original_snapshot,
102                        replay_output: Some(simulated_output),
103                        outputs_match,
104                        diff: None, // TODO: Implement diff calculation
105                        policy_violations: Vec::new(),
106                        execution_time_ms: start_time.elapsed().as_millis() as f64,
107                    })
108                }
109            }
110        }
111    }
112
113    /// Replay with policy validation
114    pub fn replay_with_policy(
115        &self,
116        snapshot_id: &str,
117        policy: &ReplayPolicy,
118        mode: Option<ReplayMode>,
119    ) -> Result<ReplayResult, ReplayError> {
120        let mut result = self.replay(snapshot_id, mode, None)?;
121
122        // Validate against policy
123        let violations = self.validate(snapshot_id, policy)?;
124        result.policy_violations = violations;
125
126        if !result.policy_violations.is_empty() {
127            result.status = ReplayStatus::Failed;
128        }
129
130        Ok(result)
131    }
132
133    /// Validate a snapshot against a policy
134    pub fn validate(
135        &self,
136        snapshot_id: &str,
137        policy: &ReplayPolicy,
138    ) -> Result<Vec<PolicyViolation>, ReplayError> {
139        let snapshot = self
140            .storage
141            .load_decision(snapshot_id)
142            .map_err(|e| ReplayError::StorageError(e.to_string()))?;
143
144        let mut violations = Vec::new();
145
146        // Apply policy rules
147        for rule in &policy.rules {
148            match rule.comparator {
149                Comparator::ExactMatch => {
150                    // For exact match, we'd need to compare with expected values
151                    // This is a simplified implementation
152                    if rule.field == "function_name" {
153                        // Example validation logic
154                        if snapshot.function_name.is_empty() {
155                            violations.push(PolicyViolation {
156                                rule_name: format!("exact_match_{}", rule.field),
157                                field: rule.field.clone(),
158                                expected: "non-empty function name".to_string(),
159                                actual: "empty".to_string(),
160                                message: "Function name cannot be empty".to_string(),
161                            });
162                        }
163                    }
164                }
165                Comparator::SemanticSimilarity => {
166                    // For similarity threshold, we'd need reference values
167                    // This is a simplified implementation
168                    if rule.field == "output" && snapshot.outputs.is_empty() {
169                        violations.push(PolicyViolation {
170                            rule_name: format!("similarity_{}", rule.field),
171                            field: rule.field.clone(),
172                            expected: "at least one output".to_string(),
173                            actual: "no outputs".to_string(),
174                            message: "At least one output is required".to_string(),
175                        });
176                    }
177                }
178                Comparator::MaxIncreasePercent => {
179                    // Placeholder for cost/performance checks
180                    // Would need historical data for comparison
181                }
182                Comparator::MaxDecreasePercent => {
183                    // Placeholder for cost/performance checks
184                    // Would need historical data for comparison
185                }
186                Comparator::WithinRange => {
187                    // Placeholder for range checks
188                    // Would need expected ranges for comparison
189                }
190            }
191        }
192
193        Ok(violations)
194    }
195
196    /// Execute replay with a model executor, if available
197    fn execute_replay(
198        &self,
199        original: &DecisionSnapshot,
200        mode: ReplayMode,
201        start_time: Instant,
202    ) -> Result<ReplayResult, ReplayError> {
203        if let Some(ref executor) = self.executor {
204            // Check model support
205            if let Some(ref params) = original.model_parameters {
206                if !executor.supports_model(&params.model_name) {
207                    return Err(ReplayError::ExecutionError(format!(
208                        "Executor '{}' does not support model '{}'",
209                        executor.executor_name(),
210                        params.model_name
211                    )));
212                }
213            }
214
215            // Execute with the configured default config
216            let config = ExecutionConfig::default();
217            let exec_result = executor.execute(
218                &original.inputs,
219                original.model_parameters.as_ref(),
220                &original.context,
221                &config,
222            )?;
223
224            let execution_time = start_time.elapsed().as_millis() as f64;
225
226            // Compare outputs based on mode
227            let tolerance = match mode {
228                ReplayMode::Strict => 1.0,         // Exact match
229                ReplayMode::Tolerant => 0.8,       // 80% similarity
230                ReplayMode::ValidationOnly => 0.0, // Don't care about outputs
231            };
232
233            let comparison =
234                executor.compare_outputs(&original.outputs, &exec_result.outputs, tolerance);
235
236            let replay_output = serde_json::to_value(&exec_result.outputs).ok();
237
238            Ok(ReplayResult {
239                status: if comparison.is_match {
240                    ReplayStatus::Success
241                } else {
242                    ReplayStatus::Failed
243                },
244                original_snapshot: original.clone(),
245                replay_output,
246                outputs_match: comparison.is_match,
247                diff: Some(SnapshotDiff {
248                    inputs_changed: false,
249                    outputs_changed: !comparison.is_match,
250                    model_params_changed: false,
251                    execution_time_delta_ms: execution_time
252                        - original.execution_time_ms.unwrap_or(0.0),
253                    changes: comparison
254                        .field_comparisons
255                        .iter()
256                        .filter(|c| !c.is_match)
257                        .map(|c| FieldChange {
258                            field_path: format!("output.{}", c.field_name),
259                            old_value: c.original_value.clone(),
260                            new_value: c.replayed_value.clone(),
261                            change_type: ChangeType::Modified,
262                        })
263                        .collect(),
264                }),
265                policy_violations: Vec::new(),
266                execution_time_ms: execution_time,
267            })
268        } else {
269            // Fall back to simulation if no executor is set
270            self.simulate_replay(original, mode, start_time)
271        }
272    }
273
274    /// Get replay statistics for a list of snapshots
275    pub fn get_replay_stats(&self, snapshot_ids: &[String]) -> Result<ReplayStats, ReplayError> {
276        let mut total_replays = 0;
277        let mut successful_replays = 0;
278        let mut failed_replays = 0;
279        let mut exact_matches = 0;
280        let mut mismatches = 0;
281        let mut total_execution_time_ms = 0.0;
282
283        for snapshot_id in snapshot_ids {
284            match self.replay(snapshot_id, None, None) {
285                Ok(result) => {
286                    total_replays += 1;
287                    total_execution_time_ms += result.execution_time_ms;
288
289                    match result.status {
290                        ReplayStatus::Success => {
291                            successful_replays += 1;
292                            if result.outputs_match {
293                                exact_matches += 1;
294                            } else {
295                                mismatches += 1;
296                            }
297                        }
298                        _ => {
299                            failed_replays += 1;
300                            mismatches += 1;
301                        }
302                    }
303                }
304                Err(_) => {
305                    total_replays += 1;
306                    failed_replays += 1;
307                    mismatches += 1;
308                }
309            }
310        }
311
312        let average_execution_time_ms = if total_replays > 0 {
313            total_execution_time_ms / total_replays as f64
314        } else {
315            0.0
316        };
317
318        Ok(ReplayStats {
319            total_replays,
320            successful_replays,
321            failed_replays,
322            exact_matches,
323            mismatches,
324            average_execution_time_ms,
325            total_execution_time_ms,
326        })
327    }
328
329    fn simulate_replay(
330        &self,
331        original: &DecisionSnapshot,
332        mode: ReplayMode,
333        start_time: Instant,
334    ) -> Result<ReplayResult, ReplayError> {
335        let simulated_output = simulate_execution(original);
336        let outputs_match = compare_outputs(original, &simulated_output, &mode);
337
338        Ok(ReplayResult {
339            status: if outputs_match {
340                ReplayStatus::Success
341            } else {
342                ReplayStatus::Failed
343            },
344            original_snapshot: original.clone(),
345            replay_output: Some(simulated_output),
346            outputs_match,
347            diff: None,
348            policy_violations: Vec::new(),
349            execution_time_ms: start_time.elapsed().as_millis() as f64,
350        })
351    }
352}
353
354impl<S: SyncStorageBackend> Clone for SyncReplayEngine<S>
355where
356    S: Clone,
357{
358    fn clone(&self) -> Self {
359        Self {
360            storage: self.storage.clone(),
361            default_mode: self.default_mode.clone(),
362            executor: self.executor.clone(),
363        }
364    }
365}
366
367/// Simulate execution for demo purposes
368/// In a real implementation, this would re-execute the decision
369fn simulate_execution(decision: &DecisionSnapshot) -> serde_json::Value {
370    // For now, just return the first output if available
371    if let Some(output) = decision.outputs.first() {
372        output.value.clone()
373    } else {
374        serde_json::Value::Null
375    }
376}
377
378/// Compare outputs based on replay mode
379fn compare_outputs(
380    decision: &DecisionSnapshot,
381    simulated_output: &serde_json::Value,
382    mode: &ReplayMode,
383) -> bool {
384    if let Some(original_output) = decision.outputs.first() {
385        match mode {
386            ReplayMode::Strict => {
387                // Exact match required
388                original_output.value == *simulated_output
389            }
390            ReplayMode::Tolerant => {
391                // Allow some differences (simplified implementation)
392                if original_output.value == *simulated_output {
393                    true
394                } else {
395                    // Could implement fuzzy matching here
396                    false
397                }
398            }
399            ReplayMode::ValidationOnly => {
400                // Always true for validation-only mode
401                true
402            }
403        }
404    } else {
405        simulated_output.is_null()
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use crate::models::*;
413    use crate::storage::sync::MemoryStorageBackend;
414    use serde_json::json;
415
416    fn create_test_decision() -> DecisionSnapshot {
417        let input = Input::new("test_input", json!("value"), "string");
418        let output = Output::new("test_output", json!("result"), "string");
419        let model_params = ModelParameters::new("gpt-4");
420
421        DecisionSnapshot::new("test_function")
422            .with_module("test_module")
423            .add_input(input)
424            .add_output(output)
425            .with_model_parameters(model_params)
426            .add_tag("env", "test")
427    }
428
429    #[test]
430    fn test_sync_replay_validation_only() {
431        let storage = MemoryStorageBackend::new();
432        let engine = SyncReplayEngine::new(storage);
433
434        let decision = create_test_decision();
435        let decision_id = engine.storage.save_decision(&decision).unwrap();
436
437        let result = engine
438            .replay(&decision_id, Some(ReplayMode::ValidationOnly), None)
439            .unwrap();
440
441        assert_eq!(result.status, ReplayStatus::Success);
442        assert!(result.outputs_match);
443        assert!(result.replay_output.is_none());
444    }
445
446    #[test]
447    fn test_sync_replay_tolerant_mode() {
448        let storage = MemoryStorageBackend::new();
449        let engine = SyncReplayEngine::new(storage);
450
451        let decision = create_test_decision();
452        let decision_id = engine.storage.save_decision(&decision).unwrap();
453
454        let result = engine
455            .replay(&decision_id, Some(ReplayMode::Tolerant), None)
456            .unwrap();
457
458        assert_eq!(result.status, ReplayStatus::Success);
459        assert!(result.replay_output.is_some());
460    }
461
462    #[test]
463    fn test_sync_replay_stats() {
464        let storage = MemoryStorageBackend::new();
465        let engine = SyncReplayEngine::new(storage);
466
467        let decision1 = create_test_decision();
468        let decision2 = create_test_decision();
469
470        let id1 = engine.storage.save_decision(&decision1).unwrap();
471        let id2 = engine.storage.save_decision(&decision2).unwrap();
472
473        let stats = engine.get_replay_stats(&[id1, id2]).unwrap();
474
475        assert_eq!(stats.total_replays, 2);
476        assert!(stats.total_execution_time_ms >= 0.0);
477        assert!(stats.average_execution_time_ms >= 0.0);
478    }
479
480    #[test]
481    fn test_sync_replay_policy_validation() {
482        let storage = MemoryStorageBackend::new();
483        let engine = SyncReplayEngine::new(storage);
484
485        let policy = ReplayPolicy::new("test_policy".to_string())
486            .with_exact_match("function_name".to_string());
487
488        let decision = create_test_decision();
489        let decision_id = engine.storage.save_decision(&decision).unwrap();
490
491        let violations = engine.validate(&decision_id, &policy).unwrap();
492
493        // Should pass validation since function name is not empty
494        assert!(violations.is_empty());
495    }
496}