use crate::error::{Error, Result};
use crate::trace::event::TraceEvent;
use crate::types::{ObligationId, RegionId, TaskId};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet, VecDeque};
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinimizationConfig {
pub max_iterations: usize,
pub min_chunk_size: usize,
pub aggressive_pruning: bool,
pub preserve_timing: bool,
pub target_reduction: f64,
pub replay_timeout_ms: u64,
}
impl Default for MinimizationConfig {
fn default() -> Self {
Self {
max_iterations: 1000,
min_chunk_size: 1,
aggressive_pruning: true,
preserve_timing: true,
target_reduction: 0.1,
replay_timeout_ms: 5000,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MinimizationResult {
pub original_size: usize,
pub minimized_size: usize,
pub reduction_ratio: f64,
pub iterations: usize,
pub duration_ms: u64,
pub essential_events: Vec<usize>,
pub pruned_events: Vec<usize>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MinimizationStrategy {
DeltaDebugging,
DependencyPruning,
CausalCone,
Hybrid,
}
pub trait ReplayValidator: Send + Sync {
fn validate_replay(&self, events: &[TraceEvent]) -> Result<bool>;
fn target_description(&self) -> String;
}
pub struct TraceMinimizer {
config: MinimizationConfig,
validator: Arc<dyn ReplayValidator>,
strategy: MinimizationStrategy,
cache: HashMap<Vec<usize>, bool>,
}
impl TraceMinimizer {
pub fn new(
config: MinimizationConfig,
validator: Arc<dyn ReplayValidator>,
strategy: MinimizationStrategy,
) -> Self {
Self {
config,
validator,
strategy,
cache: HashMap::new(),
}
}
pub async fn minimize(&mut self, events: Vec<TraceEvent>) -> Result<MinimizationResult> {
let start_time = Instant::now();
let original_size = events.len();
info!(
"Starting trace minimization: {} events, strategy: {:?}",
original_size, self.strategy
);
let minimized_events = match self.strategy {
MinimizationStrategy::DeltaDebugging => self.delta_debugging_minimize(events).await?,
MinimizationStrategy::DependencyPruning => {
self.dependency_pruning_minimize(events).await?
}
MinimizationStrategy::CausalCone => self.causal_cone_minimize(events).await?,
MinimizationStrategy::Hybrid => self.hybrid_minimize(events).await?,
};
let minimized_size = minimized_events.len();
let reduction_ratio = if original_size > 0 {
1.0 - (minimized_size as f64 / original_size as f64)
} else {
0.0
};
let duration = start_time.elapsed();
let essential_events: Vec<usize> = minimized_events
.iter()
.enumerate()
.map(|(i, _)| i)
.collect();
let pruned_events: Vec<usize> = (minimized_size..original_size).collect();
let result = MinimizationResult {
original_size,
minimized_size,
reduction_ratio,
iterations: self.cache.len(),
duration_ms: duration.as_millis() as u64,
essential_events,
pruned_events,
};
info!(
"Minimization complete: {} -> {} events ({:.1}% reduction)",
original_size,
minimized_size,
reduction_ratio * 100.0
);
Ok(result)
}
async fn delta_debugging_minimize(
&mut self,
events: Vec<TraceEvent>,
) -> Result<Vec<TraceEvent>> {
let mut current = events;
let mut changed = true;
let mut iteration = 0;
while changed && iteration < self.config.max_iterations {
changed = false;
iteration += 1;
debug!(
"Delta debugging iteration {}, {} events",
iteration,
current.len()
);
let chunk_size = std::cmp::max(self.config.min_chunk_size, current.len() / 4);
for start in (0..current.len()).step_by(chunk_size) {
let end = std::cmp::min(start + chunk_size, current.len());
let mut candidate = current.clone();
candidate.drain(start..end);
if self.validate_candidate(&candidate).await? {
current = candidate;
changed = true;
break; }
}
}
Ok(current)
}
async fn dependency_pruning_minimize(
&mut self,
events: Vec<TraceEvent>,
) -> Result<Vec<TraceEvent>> {
let dependencies = self.compute_dependencies(&events);
let mut essential = HashSet::new();
for (i, event) in events.iter().enumerate() {
if self.is_target_event(event) {
self.mark_dependencies_recursive(&dependencies, i, &mut essential);
}
}
let minimized: Vec<TraceEvent> = events
.into_iter()
.enumerate()
.filter_map(|(i, event)| {
if essential.contains(&i) {
Some(event)
} else {
None
}
})
.collect();
if !self.validate_candidate(&minimized).await? {
warn!("Dependency pruning produced invalid trace, falling back to original");
return Err(Error::internal("Dependency pruning failed validation"));
}
Ok(minimized)
}
async fn causal_cone_minimize(&mut self, events: Vec<TraceEvent>) -> Result<Vec<TraceEvent>> {
let causal_graph = self.build_causal_graph(&events);
let target_events = self.find_target_events(&events);
let mut reachable = HashSet::new();
let mut queue = VecDeque::new();
for &target in &target_events {
queue.push_back(target);
reachable.insert(target);
}
while let Some(current) = queue.pop_front() {
if let Some(predecessors) = causal_graph.get(¤t) {
for &pred in predecessors {
if reachable.insert(pred) {
queue.push_back(pred);
}
}
}
}
let minimized: Vec<TraceEvent> = events
.into_iter()
.enumerate()
.filter_map(|(i, event)| {
if reachable.contains(&i) {
Some(event)
} else {
None
}
})
.collect();
if !self.validate_candidate(&minimized).await? {
warn!("Causal cone minimization produced invalid trace, falling back");
return Err(Error::internal(
"Causal cone minimization failed validation",
));
}
Ok(minimized)
}
async fn hybrid_minimize(&mut self, events: Vec<TraceEvent>) -> Result<Vec<TraceEvent>> {
let mut current = self
.dependency_pruning_minimize(events.clone())
.await
.unwrap_or(events);
if current.len() > 100 {
let fallback = current.clone();
current = self.causal_cone_minimize(current).await.unwrap_or(fallback);
}
if current.len() > 10 {
let fallback = current.clone();
current = self
.delta_debugging_minimize(current)
.await
.unwrap_or(fallback);
}
Ok(current)
}
async fn validate_candidate(&mut self, events: &[TraceEvent]) -> Result<bool> {
let key: Vec<usize> = events.iter().enumerate().map(|(i, _)| i).collect();
if let Some(&cached) = self.cache.get(&key) {
return Ok(cached);
}
let result = self.validator.validate_replay(events)?;
self.cache.insert(key, result);
Ok(result)
}
fn compute_dependencies(&self, events: &[TraceEvent]) -> HashMap<usize, Vec<usize>> {
let mut dependencies: HashMap<usize, Vec<usize>> = HashMap::new();
let mut task_events = HashMap::<TaskId, Vec<usize>>::new();
let mut region_events = HashMap::<RegionId, Vec<usize>>::new();
let mut obligation_events = HashMap::<ObligationId, Vec<usize>>::new();
for (i, event) in events.iter().enumerate() {
if let Some(task_id) = self.extract_task_id(event) {
task_events.entry(task_id).or_default().push(i);
}
if let Some(region_id) = self.extract_region_id(event) {
region_events.entry(region_id).or_default().push(i);
}
if let Some(obligation_id) = self.extract_obligation_id(event) {
obligation_events.entry(obligation_id).or_default().push(i);
}
}
for event_list in task_events.values() {
for window in event_list.windows(2) {
let (first, second) = (window[0], window[1]);
dependencies.entry(second).or_default().push(first);
}
}
for event_list in region_events.values() {
for window in event_list.windows(2) {
let (first, second) = (window[0], window[1]);
dependencies.entry(second).or_default().push(first);
}
}
for event_list in obligation_events.values() {
for window in event_list.windows(2) {
let (first, second) = (window[0], window[1]);
dependencies.entry(second).or_default().push(first);
}
}
for i in 0..events.len() {
for j in 0..i {
if self.has_causal_relationship(&events[j], &events[i]) {
dependencies.entry(i).or_default().push(j);
}
}
}
self.add_semantic_dependencies(events, &mut dependencies);
dependencies
}
fn mark_dependencies_recursive(
&self,
dependencies: &HashMap<usize, Vec<usize>>,
event_idx: usize,
essential: &mut HashSet<usize>,
) {
if !essential.insert(event_idx) {
return; }
if let Some(deps) = dependencies.get(&event_idx) {
for &dep in deps {
self.mark_dependencies_recursive(dependencies, dep, essential);
}
}
}
fn build_causal_graph(&self, events: &[TraceEvent]) -> HashMap<usize, Vec<usize>> {
let mut graph: HashMap<usize, Vec<usize>> = HashMap::new();
for i in 0..events.len() {
for j in 0..i {
if self.has_causal_relationship(&events[j], &events[i]) {
graph.entry(i).or_default().push(j);
}
}
}
graph
}
fn is_target_event(&self, event: &TraceEvent) -> bool {
use crate::trace::event::TraceEventKind::*;
match event.kind {
ObligationLeak | ObligationAbort | FuturelockDetected => true,
CancelRequest | CancelAck => true,
RegionCloseBegin | RegionCloseComplete | RegionCancelled => true,
WorkerCancelRequested | WorkerDrainCompleted | WorkerFinalizeCompleted => true,
DownDelivered | ExitDelivered => true,
IoError => true,
UserTrace | Checkpoint => true,
Spawn
| Schedule
| Yield
| Wake
| Poll
| Complete
| RegionCreated
| ObligationReserve
| ObligationCommit
| TimeAdvance
| TimerScheduled
| TimerFired
| TimerCancelled
| IoRequested
| IoReady
| IoResult
| RngSeed
| RngValue
| ChaosInjection
| MonitorCreated
| MonitorDropped
| LinkCreated
| LinkDropped
| WorkerCancelAcknowledged
| WorkerDrainStarted => false,
}
}
fn find_target_events(&self, events: &[TraceEvent]) -> Vec<usize> {
events
.iter()
.enumerate()
.filter_map(|(i, event)| {
if self.is_target_event(event) {
Some(i)
} else {
None
}
})
.collect()
}
fn extract_task_id(&self, event: &TraceEvent) -> Option<TaskId> {
use crate::trace::event::TraceData;
match &event.data {
TraceData::Task { task, .. } => Some(*task),
TraceData::Cancel { task, .. } => Some(*task),
TraceData::Obligation { task, .. } => Some(*task),
_ => None,
}
}
fn extract_region_id(&self, event: &TraceEvent) -> Option<RegionId> {
use crate::trace::event::TraceData;
match &event.data {
TraceData::Task { region, .. } => Some(*region),
TraceData::Region { region, .. } => Some(*region),
TraceData::Cancel { region, .. } => Some(*region),
TraceData::Obligation { region, .. } => Some(*region),
_ => None,
}
}
fn extract_obligation_id(&self, event: &TraceEvent) -> Option<ObligationId> {
use crate::trace::event::TraceData;
match &event.data {
TraceData::Obligation { obligation, .. } => Some(*obligation),
_ => None,
}
}
fn add_semantic_dependencies(
&self,
events: &[TraceEvent],
dependencies: &mut HashMap<usize, Vec<usize>>,
) {
use crate::trace::event::TraceEventKind::*;
for (child_idx, child_event) in events.iter().enumerate() {
if self.extract_region_id(child_event).is_some() {
for (parent_idx, parent_event) in events.iter().enumerate().take(child_idx) {
if parent_event.kind == RegionCreated {
if let Some(parent_region) = self.extract_region_id(parent_event) {
if let crate::trace::event::TraceData::Region {
parent: Some(p), ..
} = &child_event.data
{
if *p == parent_region {
dependencies.entry(child_idx).or_default().push(parent_idx);
}
}
}
}
}
}
}
let mut timer_scheduled_indices = Vec::new();
for (i, event) in events.iter().enumerate() {
match event.kind {
TimerScheduled => timer_scheduled_indices.push(i),
TimerFired | TimerCancelled => {
if let Some(&last_scheduled) = timer_scheduled_indices.last() {
dependencies.entry(i).or_default().push(last_scheduled);
}
}
_ => {}
}
}
let mut monitor_created_indices = Vec::new();
let mut link_created_indices = Vec::new();
for (i, event) in events.iter().enumerate() {
match event.kind {
MonitorCreated => monitor_created_indices.push(i),
LinkCreated => link_created_indices.push(i),
DownDelivered => {
for &monitor_idx in &monitor_created_indices {
if monitor_idx < i {
dependencies.entry(i).or_default().push(monitor_idx);
}
}
}
ExitDelivered => {
for &link_idx in &link_created_indices {
if link_idx < i {
dependencies.entry(i).or_default().push(link_idx);
}
}
}
_ => {}
}
}
}
fn has_causal_relationship(&self, first: &TraceEvent, second: &TraceEvent) -> bool {
use crate::trace::event::{TraceData, TraceEventKind::*};
if let (Some(first_time), Some(second_time)) = (&first.logical_time, &second.logical_time) {
if first_time < second_time {
return true;
}
}
match (&first.kind, &second.kind, &first.data, &second.data) {
(
Spawn,
Schedule,
TraceData::Task { task: task1, .. },
TraceData::Task { task: task2, .. },
)
| (
Schedule,
Poll,
TraceData::Task { task: task1, .. },
TraceData::Task { task: task2, .. },
)
| (
Poll,
Complete,
TraceData::Task { task: task1, .. },
TraceData::Task { task: task2, .. },
) if task1 == task2 => true,
(
Wake,
Schedule,
TraceData::Task { task: task1, .. },
TraceData::Task { task: task2, .. },
) if task1 == task2 => true,
(
CancelRequest,
CancelAck,
TraceData::Cancel { task: task1, .. },
TraceData::Cancel { task: task2, .. },
) if task1 == task2 => true,
(
RegionCreated,
RegionCloseBegin,
TraceData::Region {
region: region1, ..
},
TraceData::Region {
region: region2, ..
},
)
| (
RegionCloseBegin,
RegionCloseComplete,
TraceData::Region {
region: region1, ..
},
TraceData::Region {
region: region2, ..
},
) if region1 == region2 => true,
(
RegionCreated,
_,
TraceData::Region { region: parent, .. },
TraceData::Region {
parent: Some(child_parent),
..
},
) if parent == child_parent => true,
(
ObligationReserve,
ObligationCommit,
TraceData::Obligation {
obligation: obl1, ..
},
TraceData::Obligation {
obligation: obl2, ..
},
)
| (
ObligationReserve,
ObligationAbort,
TraceData::Obligation {
obligation: obl1, ..
},
TraceData::Obligation {
obligation: obl2, ..
},
) if obl1 == obl2 => true,
(TimerScheduled, TimerFired, _, _) | (TimerScheduled, TimerCancelled, _, _) => true,
(IoRequested, IoReady, _, _) | (IoReady, IoResult, _, _) | (IoReady, IoError, _, _) => {
true
}
(WorkerCancelRequested, WorkerCancelAcknowledged, _, _)
| (WorkerCancelAcknowledged, WorkerDrainStarted, _, _)
| (WorkerDrainStarted, WorkerDrainCompleted, _, _)
| (WorkerDrainCompleted, WorkerFinalizeCompleted, _, _) => true,
(MonitorCreated, DownDelivered, _, _) | (LinkCreated, ExitDelivered, _, _) => true,
_ => false,
}
}
}
#[derive(Debug)]
pub struct ReplayOptimizer {
config: MinimizationConfig,
}
impl ReplayOptimizer {
pub fn new(config: MinimizationConfig) -> Self {
Self { config }
}
pub async fn optimize(&self, events: Vec<TraceEvent>) -> Result<Vec<TraceEvent>> {
let mut optimized = events;
optimized = self.remove_redundant_events(optimized)?;
if !self.config.preserve_timing {
optimized = self.compress_timing(optimized)?;
}
optimized = self.merge_compatible_events(optimized)?;
Ok(optimized)
}
fn remove_redundant_events(&self, events: Vec<TraceEvent>) -> Result<Vec<TraceEvent>> {
let mut result = Vec::new();
let mut seen_states = HashSet::new();
for event in events {
let state_key = self.compute_state_key(&event)?;
if seen_states.insert(state_key) {
result.push(event);
}
}
Ok(result)
}
fn compress_timing(&self, events: Vec<TraceEvent>) -> Result<Vec<TraceEvent>> {
Ok(events)
}
fn merge_compatible_events(&self, events: Vec<TraceEvent>) -> Result<Vec<TraceEvent>> {
Ok(events)
}
fn extract_task_id(&self, event: &TraceEvent) -> Option<TaskId> {
use crate::trace::event::TraceData;
match &event.data {
TraceData::Task { task, .. } => Some(*task),
TraceData::Cancel { task, .. } => Some(*task),
TraceData::Obligation { task, .. } => Some(*task),
_ => None,
}
}
fn extract_region_id(&self, event: &TraceEvent) -> Option<RegionId> {
use crate::trace::event::TraceData;
match &event.data {
TraceData::Task { region, .. } => Some(*region),
TraceData::Region { region, .. } => Some(*region),
TraceData::Cancel { region, .. } => Some(*region),
TraceData::Obligation { region, .. } => Some(*region),
_ => None,
}
}
fn extract_obligation_id(&self, event: &TraceEvent) -> Option<ObligationId> {
use crate::trace::event::TraceData;
match &event.data {
TraceData::Obligation { obligation, .. } => Some(*obligation),
_ => None,
}
}
fn compute_state_key(&self, event: &TraceEvent) -> Result<String> {
use crate::trace::event::TraceEventKind;
let kind_str = match event.kind {
TraceEventKind::Spawn => "spawn",
TraceEventKind::Schedule => "schedule",
TraceEventKind::Poll => "poll",
TraceEventKind::Complete => "complete",
TraceEventKind::CancelRequest => "cancel_req",
TraceEventKind::CancelAck => "cancel_ack",
TraceEventKind::RegionCreated => "region_created",
TraceEventKind::RegionCloseBegin => "region_close_begin",
TraceEventKind::RegionCloseComplete => "region_close_complete",
TraceEventKind::ObligationReserve => "obl_reserve",
TraceEventKind::ObligationCommit => "obl_commit",
TraceEventKind::ObligationAbort => "obl_abort",
TraceEventKind::ObligationLeak => "obl_leak",
TraceEventKind::FuturelockDetected => "futurelock",
_ => "other",
};
let task_id = self
.extract_task_id(event)
.map(|id| format!("_t{}", id.as_u64()))
.unwrap_or_default();
let region_id = self
.extract_region_id(event)
.map(|id| format!("_r{}", id.as_u64()))
.unwrap_or_default();
let obligation_id = self
.extract_obligation_id(event)
.map(|id| format!("_o{}", id.as_u64()))
.unwrap_or_default();
Ok(format!(
"{}{}{}{}",
kind_str, task_id, region_id, obligation_id
))
}
}
pub struct MinimizerFactory;
impl MinimizerFactory {
pub fn for_bug_reproduction(bug_validator: Arc<dyn ReplayValidator>) -> TraceMinimizer {
let config = MinimizationConfig {
aggressive_pruning: true,
target_reduction: 0.05, ..Default::default()
};
TraceMinimizer::new(config, bug_validator, MinimizationStrategy::Hybrid)
}
pub fn for_performance_analysis(perf_validator: Arc<dyn ReplayValidator>) -> TraceMinimizer {
let config = MinimizationConfig {
preserve_timing: true, target_reduction: 0.3, ..Default::default()
};
TraceMinimizer::new(config, perf_validator, MinimizationStrategy::CausalCone)
}
pub fn for_race_conditions(race_validator: Arc<dyn ReplayValidator>) -> TraceMinimizer {
let config = MinimizationConfig {
preserve_timing: true, aggressive_pruning: false, target_reduction: 0.5,
..Default::default()
};
TraceMinimizer::new(
config,
race_validator,
MinimizationStrategy::DependencyPruning,
)
}
}
pub mod utils {
use super::*;
pub async fn load_trace(path: &Path) -> Result<Vec<TraceEvent>> {
let content = tokio::fs::read_to_string(path)
.await
.map_err(|e| Error::internal(format!("failed to read trace file: {e}")))?;
let events: Vec<TraceEvent> = content
.lines()
.filter_map(|line| serde_json::from_str(line).ok())
.collect();
Ok(events)
}
pub async fn save_trace(events: &[TraceEvent], path: &Path) -> Result<()> {
let mut lines = Vec::new();
for event in events {
lines.push(
serde_json::to_string(event).map_err(|e| {
Error::internal(format!("failed to serialize trace event: {e}"))
})?,
);
}
let content = lines.join("\n");
tokio::fs::write(path, content)
.await
.map_err(|e| Error::internal(format!("failed to write trace file: {e}")))?;
Ok(())
}
pub fn compute_trace_stats(events: &[TraceEvent]) -> TraceStatistics {
let mut stats = TraceStatistics::default();
stats.total_events = events.len();
stats
}
#[derive(Debug, Default)]
pub struct TraceStatistics {
pub total_events: usize,
pub unique_tasks: usize,
pub unique_regions: usize,
pub duration_ms: u64,
pub event_types: HashMap<String, usize>,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::trace::event::TraceEvent;
struct MockValidator {
should_pass: bool,
}
impl ReplayValidator for MockValidator {
fn validate_replay(&self, _events: &[TraceEvent]) -> Result<bool> {
Ok(self.should_pass)
}
fn target_description(&self) -> String {
"Deterministic validation".to_string()
}
}
#[tokio::test]
async fn test_delta_debugging_minimization() {
let validator = Arc::new(MockValidator { should_pass: true });
let mut minimizer = TraceMinimizer::new(
MinimizationConfig::default(),
validator,
MinimizationStrategy::DeltaDebugging,
);
let events = vec![]; let result = minimizer.minimize(events).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_minimization_config() {
let config = MinimizationConfig {
max_iterations: 500,
target_reduction: 0.2,
..Default::default()
};
assert_eq!(config.max_iterations, 500);
assert_eq!(config.target_reduction, 0.2);
}
#[tokio::test]
async fn test_replay_optimizer() {
let optimizer = ReplayOptimizer::new(MinimizationConfig::default());
let events = vec![];
let result = optimizer.optimize(events).await;
assert!(result.is_ok());
}
#[test]
fn test_minimizer_factory() {
let validator = Arc::new(MockValidator { should_pass: true });
let minimizer = MinimizerFactory::for_bug_reproduction(validator);
assert_eq!(minimizer.strategy, MinimizationStrategy::Hybrid);
}
}