use crate::atp::object::ObjectId;
use crate::atp::repair_scheduler::MultiSourceRepairScheduler;
use crate::error::Result;
use crate::types::TraceId;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
#[cfg(feature = "tracing-integration")]
use tracing::{debug, info};
#[cfg(not(feature = "tracing-integration"))]
macro_rules! debug {
($($arg:tt)*) => {};
}
#[cfg(not(feature = "tracing-integration"))]
macro_rules! info {
($($arg:tt)*) => {};
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepairCoordinatorConfig {
pub min_roi_threshold: f64,
pub default_repair_mode: RepairMode,
pub max_encode_cpu_budget: Duration,
pub max_memory_overhead_ratio: f64,
pub enable_telemetry: bool,
pub decision_logging_level: RepairLoggingLevel,
pub max_decisions_per_minute: u32,
pub max_telemetry_per_minute: u32,
pub max_decision_history: usize,
pub max_telemetry_history: usize,
}
impl Default for RepairCoordinatorConfig {
fn default() -> Self {
Self {
min_roi_threshold: 1.2, default_repair_mode: RepairMode::Off,
max_encode_cpu_budget: Duration::from_millis(500),
max_memory_overhead_ratio: 0.1, enable_telemetry: true,
decision_logging_level: RepairLoggingLevel::Normal,
max_decisions_per_minute: 60, max_telemetry_per_minute: 120, max_decision_history: 100, max_telemetry_history: 500, }
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum RepairLoggingLevel {
Off,
Normal,
Verbose,
Debug,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum RepairMode {
Off,
Tail,
Lossy,
ResumeRepair,
Broadcast,
Swarm,
RelayExpensive,
MobileUnstable,
HighBDP,
}
impl RepairMode {
pub fn description(&self) -> &'static str {
match self {
RepairMode::Off => "no repair - exact retransmission only",
RepairMode::Tail => "tail repair for last missing chunks",
RepairMode::Lossy => "preemptive repair for lossy paths",
RepairMode::ResumeRepair => "repair gaps from interrupted transfers",
RepairMode::Broadcast => "efficient multicast/broadcast repair",
RepairMode::Swarm => "multi-peer swarm coordination",
RepairMode::RelayExpensive => "minimize relay bandwidth usage",
RepairMode::MobileUnstable => "handle unstable mobile connections",
RepairMode::HighBDP => "handle high bandwidth-delay product",
}
}
pub fn requires_multi_source(&self) -> bool {
matches!(self, RepairMode::Swarm | RepairMode::Broadcast)
}
pub fn typical_overhead_multiplier(&self) -> f64 {
match self {
RepairMode::Off => 0.0,
RepairMode::Tail => 0.05, RepairMode::Lossy => 0.15, RepairMode::ResumeRepair => 0.1, RepairMode::Broadcast => 0.2, RepairMode::Swarm => 0.25, RepairMode::RelayExpensive => 0.3, RepairMode::MobileUnstable => 0.35, RepairMode::HighBDP => 0.4, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PathCharacteristics {
pub rtt_ms: f64,
pub bandwidth_bps: u64,
pub loss_rate: f64,
pub jitter_ms: f64,
pub bdp_bytes: u64,
pub stability_score: f64,
pub uses_relay: bool,
pub relay_cost_per_byte: f64,
pub is_mobile: bool,
pub is_high_latency: bool,
}
impl Default for PathCharacteristics {
fn default() -> Self {
Self {
rtt_ms: 50.0,
bandwidth_bps: 10_000_000, loss_rate: 0.001, jitter_ms: 5.0,
bdp_bytes: 62_500, stability_score: 0.9,
uses_relay: false,
relay_cost_per_byte: 0.0,
is_mobile: false,
is_high_latency: false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransferState {
pub object_size_bytes: u64,
pub bytes_transferred: u64,
pub missing_chunks: usize,
pub missing_bytes: u64,
pub is_resume: bool,
pub elapsed_time: Duration,
pub retransmit_attempts: u32,
pub available_peers: usize,
pub memory_pressure: f64,
pub cpu_pressure: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepairRoi {
pub expected_time_saved: Duration,
pub encode_cpu_cost: Duration,
pub decode_cpu_cost: Duration,
pub bandwidth_overhead: u64,
pub memory_overhead: u64,
pub coordination_cost: Duration,
pub benefit_score: f64,
pub cost_score: f64,
pub roi_ratio: f64,
pub confidence: f64,
}
impl RepairRoi {
pub fn justifies_repair(&self, threshold: f64) -> bool {
self.roi_ratio >= threshold && self.confidence >= 0.6
}
pub fn summary(&self) -> String {
format!(
"ROI {:.2} (benefit: {:.2}, cost: {:.2}, confidence: {:.1}%)",
self.roi_ratio,
self.benefit_score,
self.cost_score,
self.confidence * 100.0
)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepairDecision {
pub mode: RepairMode,
pub roi: RepairRoi,
pub reasoning: String,
pub factors: RepairDecisionFactors,
pub decided_at: SystemTime,
pub trace_id: TraceId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepairDecisionFactors {
pub path_quality: f64,
pub loss_impact: f64,
pub bdp_impact: f64,
pub relay_cost_impact: f64,
pub resume_benefit: f64,
pub multi_source_benefit: f64,
pub resource_pressure: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepairTelemetry {
pub object_id: ObjectId,
pub mode: RepairMode,
pub predicted_roi: RepairRoi,
pub actual_repair_time: Duration,
pub actual_encode_cpu: Duration,
pub actual_decode_cpu: Duration,
pub actual_bandwidth_used: u64,
pub repair_symbols_sent: u32,
pub repair_symbols_decoded: u32,
pub success: bool,
pub actual_benefit_score: f64,
pub actual_roi_ratio: f64,
pub measured_at: SystemTime,
}
pub struct RepairCoordinator {
config: RepairCoordinatorConfig,
multi_source_scheduler: Option<MultiSourceRepairScheduler>,
decision_history: Vec<RepairDecision>,
telemetry: Vec<RepairTelemetry>,
mode_statistics: HashMap<RepairMode, ModeStatistics>,
decision_rate_limiter: RateLimiter,
telemetry_rate_limiter: RateLimiter,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModeStatistics {
usage_count: u32,
avg_predicted_roi: f64,
avg_actual_roi: f64,
success_rate: f64,
last_updated: SystemTime,
}
#[derive(Debug)]
struct RateLimiter {
window_start: SystemTime,
request_count: u32,
limit: u32,
window_duration: Duration,
}
impl RateLimiter {
fn new(limit: u32) -> Self {
Self {
window_start: SystemTime::now(),
request_count: 0,
limit,
window_duration: Duration::from_secs(60), }
}
fn check_rate_limit(&mut self) -> bool {
let now = SystemTime::now();
if now.duration_since(self.window_start).unwrap_or_default() >= self.window_duration {
self.window_start = now;
self.request_count = 0;
}
if self.request_count >= self.limit {
false
} else {
self.request_count += 1;
true
}
}
}
impl Default for ModeStatistics {
fn default() -> Self {
Self {
usage_count: 0,
avg_predicted_roi: 1.0,
avg_actual_roi: 1.0,
success_rate: 1.0,
last_updated: SystemTime::now(),
}
}
}
impl RepairCoordinator {
pub fn new(config: RepairCoordinatorConfig) -> Self {
Self {
decision_rate_limiter: RateLimiter::new(config.max_decisions_per_minute),
telemetry_rate_limiter: RateLimiter::new(config.max_telemetry_per_minute),
config,
multi_source_scheduler: None,
decision_history: Vec::new(),
telemetry: Vec::new(),
mode_statistics: HashMap::new(),
}
}
pub fn with_multi_source(mut self, scheduler: MultiSourceRepairScheduler) -> Self {
self.multi_source_scheduler = Some(scheduler);
self
}
pub fn decide_repair_mode(
&mut self,
_object_id: ObjectId,
path: &PathCharacteristics,
transfer: &TransferState,
trace_id: TraceId,
) -> Result<RepairDecision> {
if !self.decision_rate_limiter.check_rate_limit() {
return Err(crate::error::Error::new(
crate::error::ErrorKind::RateLimited,
));
}
let mode_candidates = self.get_applicable_modes(path, transfer);
let mut best_decision: Option<RepairDecision> = None;
let mut best_roi = 0.0;
for mode in mode_candidates {
let roi = self.calculate_roi(mode, path, transfer)?;
if roi.justifies_repair(self.config.min_roi_threshold) && roi.roi_ratio > best_roi {
let decision = RepairDecision {
mode,
roi: roi.clone(),
reasoning: self.generate_reasoning(mode, &roi, path, transfer),
factors: self.analyze_decision_factors(mode, path, transfer),
decided_at: SystemTime::now(),
trace_id,
};
best_roi = roi.roi_ratio;
best_decision = Some(decision);
}
}
let decision = best_decision.unwrap_or_else(|| RepairDecision {
mode: RepairMode::Off,
roi: RepairRoi {
expected_time_saved: Duration::ZERO,
encode_cpu_cost: Duration::ZERO,
decode_cpu_cost: Duration::ZERO,
bandwidth_overhead: 0,
memory_overhead: 0,
coordination_cost: Duration::ZERO,
benefit_score: 0.0,
cost_score: 1.0,
roi_ratio: 0.0,
confidence: 1.0,
},
reasoning: format!(
"No repair mode meets ROI threshold {:.2}",
self.config.min_roi_threshold
),
factors: RepairDecisionFactors {
path_quality: self.assess_path_quality(path),
loss_impact: path.loss_rate,
bdp_impact: (path.bdp_bytes as f64) / (64.0 * 1024.0), relay_cost_impact: if path.uses_relay {
path.relay_cost_per_byte
} else {
0.0
},
resume_benefit: if transfer.is_resume { 1.0 } else { 0.0 },
multi_source_benefit: if transfer.available_peers > 1 {
(transfer.available_peers as f64).log2() / 4.0
} else {
0.0
},
resource_pressure: f64::midpoint(transfer.cpu_pressure, transfer.memory_pressure),
},
decided_at: SystemTime::now(),
trace_id,
});
self.log_decision(&decision);
self.decision_history.push(decision.clone());
if self.decision_history.len() > self.config.max_decision_history {
let drain_count = self.config.max_decision_history / 10; self.decision_history.drain(0..drain_count);
}
Ok(decision)
}
pub fn record_telemetry(&mut self, telemetry: RepairTelemetry) -> Result<()> {
if !self.telemetry_rate_limiter.check_rate_limit() {
return Err(crate::error::Error::new(
crate::error::ErrorKind::RateLimited,
));
}
self.validate_telemetry(&telemetry)?;
let stats = self.mode_statistics.entry(telemetry.mode).or_default();
stats.usage_count += 1;
stats.avg_predicted_roi = (stats.avg_predicted_roi * (stats.usage_count - 1) as f64
+ telemetry.predicted_roi.roi_ratio)
/ stats.usage_count as f64;
stats.avg_actual_roi = (stats.avg_actual_roi * (stats.usage_count - 1) as f64
+ telemetry.actual_roi_ratio)
/ stats.usage_count as f64;
stats.success_rate = (stats.success_rate * (stats.usage_count - 1) as f64
+ if telemetry.success { 1.0 } else { 0.0 })
/ stats.usage_count as f64;
stats.last_updated = SystemTime::now();
self.telemetry.push(telemetry);
if self.telemetry.len() > self.config.max_telemetry_history {
let drain_count = self.config.max_telemetry_history / 10; self.telemetry.drain(0..drain_count);
}
Ok(())
}
pub fn get_mode_statistics(&self) -> &HashMap<RepairMode, ModeStatistics> {
&self.mode_statistics
}
pub fn get_decision_history(&self, limit: usize) -> &[RepairDecision] {
let start = self.decision_history.len().saturating_sub(limit);
&self.decision_history[start..]
}
fn get_applicable_modes(
&self,
path: &PathCharacteristics,
transfer: &TransferState,
) -> Vec<RepairMode> {
let mut modes = vec![RepairMode::Off];
if transfer.bytes_transferred as f64 / transfer.object_size_bytes as f64 >= 0.8 {
modes.push(RepairMode::Tail);
}
if path.loss_rate > 0.01 {
modes.push(RepairMode::Lossy);
}
if transfer.is_resume {
modes.push(RepairMode::ResumeRepair);
}
if path.uses_relay && path.relay_cost_per_byte > 0.0 {
modes.push(RepairMode::RelayExpensive);
}
if path.is_mobile || path.stability_score < 0.7 {
modes.push(RepairMode::MobileUnstable);
}
if path.is_high_latency || path.bdp_bytes > 1_000_000 {
modes.push(RepairMode::HighBDP);
}
if transfer.available_peers > 1 && self.multi_source_scheduler.is_some() {
modes.push(RepairMode::Swarm);
}
modes
}
fn calculate_roi(
&self,
mode: RepairMode,
path: &PathCharacteristics,
transfer: &TransferState,
) -> Result<RepairRoi> {
if mode == RepairMode::Off {
return Ok(RepairRoi {
expected_time_saved: Duration::ZERO,
encode_cpu_cost: Duration::ZERO,
decode_cpu_cost: Duration::ZERO,
bandwidth_overhead: 0,
memory_overhead: 0,
coordination_cost: Duration::ZERO,
benefit_score: 0.0,
cost_score: 1.0,
roi_ratio: 0.0,
confidence: 1.0,
});
}
let retransmit_time = self.estimate_retransmit_time(path, transfer);
let repair_time = self.estimate_repair_time(mode, path, transfer);
let expected_time_saved = retransmit_time.saturating_sub(repair_time);
let overhead_multiplier = mode.typical_overhead_multiplier();
let encode_cpu_cost = Duration::from_millis(transfer.missing_bytes / 1024 / 10); let decode_cpu_cost = Duration::from_millis(transfer.missing_bytes / 1024 / 20); let bandwidth_overhead = (transfer.missing_bytes as f64 * overhead_multiplier) as u64;
let memory_overhead = (transfer.missing_bytes as f64 * 0.1) as u64;
let coordination_cost = if mode.requires_multi_source() {
Duration::from_millis(transfer.available_peers as u64 * 10) } else {
Duration::ZERO
};
let time_benefit = expected_time_saved.as_secs_f64();
let bandwidth_benefit = if path.uses_relay {
(transfer.missing_bytes as f64 * path.relay_cost_per_byte * (1.0 - overhead_multiplier))
.max(0.0)
} else {
0.0
};
let benefit_score = time_benefit + bandwidth_benefit;
let cpu_cost = (encode_cpu_cost + decode_cpu_cost).as_secs_f64();
let bandwidth_cost = bandwidth_overhead as f64 / path.bandwidth_bps as f64;
let coordination_cost_score = coordination_cost.as_secs_f64();
let cost_score = cpu_cost + bandwidth_cost + coordination_cost_score + 1.0;
let roi_ratio = if cost_score > 0.0 {
benefit_score / cost_score
} else {
0.0
};
let confidence = (path.stability_score * 0.5
+ (transfer.retransmit_attempts.min(10) as f64 / 10.0) * 0.3
+ 0.2)
.min(1.0);
Ok(RepairRoi {
expected_time_saved,
encode_cpu_cost,
decode_cpu_cost,
bandwidth_overhead,
memory_overhead,
coordination_cost,
benefit_score,
cost_score,
roi_ratio,
confidence,
})
}
fn estimate_retransmit_time(
&self,
path: &PathCharacteristics,
transfer: &TransferState,
) -> Duration {
let base_time = Duration::from_millis(path.rtt_ms as u64 * transfer.missing_chunks as u64);
let loss_multiplier = 1.0 + path.loss_rate * 2.0; Duration::from_millis((base_time.as_millis() as f64 * loss_multiplier) as u64)
}
fn estimate_repair_time(
&self,
mode: RepairMode,
path: &PathCharacteristics,
_transfer: &TransferState,
) -> Duration {
let base_time = Duration::from_millis(path.rtt_ms as u64 / 2); let mode_multiplier = mode.typical_overhead_multiplier() + 1.0;
Duration::from_millis((base_time.as_millis() as f64 * mode_multiplier) as u64)
}
fn assess_path_quality(&self, path: &PathCharacteristics) -> f64 {
let latency_score = (100.0 - path.rtt_ms.min(100.0)) / 100.0;
let loss_score = 1.0 - path.loss_rate.min(1.0);
let stability_score = path.stability_score;
(latency_score + loss_score + stability_score) / 3.0
}
fn generate_reasoning(
&self,
mode: RepairMode,
roi: &RepairRoi,
path: &PathCharacteristics,
transfer: &TransferState,
) -> String {
let mut reasons = Vec::new();
if roi.expected_time_saved > Duration::from_millis(100) {
reasons.push(format!(
"saves {:.1}s vs retransmit",
roi.expected_time_saved.as_secs_f64()
));
}
if path.loss_rate > 0.01 {
reasons.push(format!("high loss rate {:.1}%", path.loss_rate * 100.0));
}
if path.uses_relay {
reasons.push("expensive relay path".to_string());
}
if transfer.is_resume {
reasons.push("resume repair benefit".to_string());
}
if transfer.available_peers > 1 && mode.requires_multi_source() {
reasons.push(format!("{} peers available", transfer.available_peers));
}
if reasons.is_empty() {
format!("{} - {}", mode.description(), roi.summary())
} else {
format!(
"{} - {} ({})",
mode.description(),
roi.summary(),
reasons.join(", ")
)
}
}
fn analyze_decision_factors(
&self,
_mode: RepairMode,
path: &PathCharacteristics,
transfer: &TransferState,
) -> RepairDecisionFactors {
RepairDecisionFactors {
path_quality: self.assess_path_quality(path),
loss_impact: path.loss_rate,
bdp_impact: (path.bdp_bytes as f64) / (64.0 * 1024.0),
relay_cost_impact: if path.uses_relay {
path.relay_cost_per_byte
} else {
0.0
},
resume_benefit: if transfer.is_resume { 1.0 } else { 0.0 },
multi_source_benefit: if transfer.available_peers > 1 {
(transfer.available_peers as f64).log2() / 4.0
} else {
0.0
},
resource_pressure: f64::midpoint(transfer.cpu_pressure, transfer.memory_pressure),
}
}
fn validate_telemetry(&self, telemetry: &RepairTelemetry) -> Result<()> {
if telemetry.actual_roi_ratio < 0.0 || telemetry.actual_roi_ratio > 1000.0 {
return Err(crate::error::Error::new(
crate::error::ErrorKind::InvalidInput,
));
}
if telemetry.predicted_roi.roi_ratio < 0.0 || telemetry.predicted_roi.roi_ratio > 1000.0 {
return Err(crate::error::Error::new(
crate::error::ErrorKind::InvalidInput,
));
}
if telemetry.repair_symbols_sent == 0 && telemetry.success {
return Err(crate::error::Error::new(
crate::error::ErrorKind::InvalidInput,
));
}
if telemetry.repair_symbols_decoded > telemetry.repair_symbols_sent {
return Err(crate::error::Error::new(
crate::error::ErrorKind::InvalidInput,
));
}
let now = SystemTime::now();
if telemetry.measured_at > now {
return Err(crate::error::Error::new(
crate::error::ErrorKind::InvalidInput,
));
}
if let Ok(age) = now.duration_since(telemetry.measured_at) {
if age > Duration::from_secs(3600) {
return Err(crate::error::Error::new(
crate::error::ErrorKind::InvalidInput,
));
}
}
Ok(())
}
fn log_decision(&self, decision: &RepairDecision) {
match self.config.decision_logging_level {
RepairLoggingLevel::Off => {}
RepairLoggingLevel::Normal => {
if decision.mode != RepairMode::Off {
info!(
"Repair decision: {:?} - {}",
decision.mode, decision.reasoning
);
}
}
RepairLoggingLevel::Verbose => {
info!(
"Repair decision: {:?} - {}",
decision.mode, decision.reasoning
);
}
RepairLoggingLevel::Debug => {
debug!(
"Repair decision: {:?} - {} (ROI: {:.2})",
decision.mode, decision.reasoning, decision.roi.roi_ratio
);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::atp::object::ContentId;
fn test_object_id() -> ObjectId {
ObjectId::content(ContentId::from_bytes(b"test-object"))
}
fn create_test_path() -> PathCharacteristics {
PathCharacteristics {
rtt_ms: 50.0,
bandwidth_bps: 10_000_000,
loss_rate: 0.02, jitter_ms: 5.0,
bdp_bytes: 62_500,
stability_score: 0.8,
uses_relay: false,
relay_cost_per_byte: 0.0,
is_mobile: false,
is_high_latency: false,
}
}
fn create_test_transfer() -> TransferState {
TransferState {
object_size_bytes: 1_000_000, bytes_transferred: 800_000, missing_chunks: 10,
missing_bytes: 200_000,
is_resume: false,
elapsed_time: Duration::from_secs(5),
retransmit_attempts: 3,
available_peers: 1,
memory_pressure: 0.3,
cpu_pressure: 0.4,
}
}
#[test]
fn test_repair_coordinator_creation() {
let config = RepairCoordinatorConfig::default();
let coordinator = RepairCoordinator::new(config);
assert_eq!(coordinator.decision_history.len(), 0);
assert_eq!(coordinator.telemetry.len(), 0);
}
#[test]
fn test_repair_mode_descriptions() {
assert_eq!(
RepairMode::Off.description(),
"no repair - exact retransmission only"
);
assert_eq!(
RepairMode::Tail.description(),
"tail repair for last missing chunks"
);
assert!(RepairMode::Swarm.requires_multi_source());
assert!(!RepairMode::Tail.requires_multi_source());
}
#[test]
fn test_path_quality_assessment() {
let config = RepairCoordinatorConfig::default();
let coordinator = RepairCoordinator::new(config);
let path = create_test_path();
let quality = coordinator.assess_path_quality(&path);
assert!(quality > 0.0 && quality <= 1.0);
}
#[test]
fn test_applicable_modes() {
let config = RepairCoordinatorConfig::default();
let coordinator = RepairCoordinator::new(config);
let path = create_test_path();
let transfer = create_test_transfer();
let modes = coordinator.get_applicable_modes(&path, &transfer);
assert!(modes.contains(&RepairMode::Off));
assert!(modes.contains(&RepairMode::Tail));
assert!(modes.contains(&RepairMode::Lossy));
}
#[test]
fn test_roi_calculation_off_mode() -> Result<()> {
let config = RepairCoordinatorConfig::default();
let coordinator = RepairCoordinator::new(config);
let path = create_test_path();
let transfer = create_test_transfer();
let roi = coordinator.calculate_roi(RepairMode::Off, &path, &transfer)?;
assert_eq!(roi.roi_ratio, 0.0);
assert_eq!(roi.expected_time_saved, Duration::ZERO);
assert!(!roi.justifies_repair(1.0));
Ok(())
}
#[test]
fn test_roi_calculation_tail_mode() -> Result<()> {
let config = RepairCoordinatorConfig::default();
let coordinator = RepairCoordinator::new(config);
let path = create_test_path();
let transfer = create_test_transfer();
let roi = coordinator.calculate_roi(RepairMode::Tail, &path, &transfer)?;
assert!(roi.roi_ratio > 0.0);
assert!(roi.confidence > 0.0);
Ok(())
}
#[test]
fn test_repair_decision() -> Result<()> {
let config = RepairCoordinatorConfig {
min_roi_threshold: 0.5, ..RepairCoordinatorConfig::default()
};
let mut coordinator = RepairCoordinator::new(config);
let path = create_test_path();
let transfer = create_test_transfer();
let object_id = test_object_id();
let decision = coordinator.decide_repair_mode(
object_id,
&path,
&transfer,
TraceId::from_parts(1, 1),
)?;
assert!(!decision.reasoning.is_empty());
assert!(decision.roi.confidence > 0.0);
Ok(())
}
#[test]
fn test_telemetry_recording() {
let config = RepairCoordinatorConfig::default();
let mut coordinator = RepairCoordinator::new(config);
let telemetry = RepairTelemetry {
object_id: test_object_id(),
mode: RepairMode::Tail,
predicted_roi: RepairRoi {
roi_ratio: 1.5,
..RepairRoi {
expected_time_saved: Duration::from_millis(100),
encode_cpu_cost: Duration::from_millis(10),
decode_cpu_cost: Duration::from_millis(5),
bandwidth_overhead: 1000,
memory_overhead: 500,
coordination_cost: Duration::ZERO,
benefit_score: 2.0,
cost_score: 1.0,
roi_ratio: 1.5,
confidence: 0.8,
}
},
actual_repair_time: Duration::from_millis(90),
actual_encode_cpu: Duration::from_millis(12),
actual_decode_cpu: Duration::from_millis(6),
actual_bandwidth_used: 1200,
repair_symbols_sent: 5,
repair_symbols_decoded: 5,
success: true,
actual_benefit_score: 2.1,
actual_roi_ratio: 1.6,
measured_at: SystemTime::now(),
};
coordinator
.record_telemetry(telemetry)
.expect("Telemetry recording failed");
assert_eq!(coordinator.telemetry.len(), 1);
assert!(coordinator.mode_statistics.contains_key(&RepairMode::Tail));
let stats = &coordinator.mode_statistics[&RepairMode::Tail];
assert_eq!(stats.usage_count, 1);
assert_eq!(stats.success_rate, 1.0);
}
}