use crate::Cx;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime};
pub const ATP_TRANSFER_PLAN_SCHEMA: &str = "atp-transfer-plan-v1";
pub const ATP_PLAN_EXECUTION_REPORT_SCHEMA: &str = "atp-plan-execution-report-v1";
#[derive(Debug, thiserror::Error)]
pub enum PlannerError {
#[error("Invalid input: {0}")]
InvalidInput(String),
#[error("Insufficient disk space: need {needed} bytes, have {available} bytes")]
InsufficientDiskSpace { needed: u64, available: u64 },
#[error("Quota exceeded: {0}")]
QuotaExceeded(String),
#[error("Path not available: {0}")]
PathNotAvailable(String),
#[error("Receive denied: {0}")]
ReceiveDenied(String),
#[error("Planning failed: {0}")]
PlanningFailed(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransferType {
Send,
Sync,
Mirror,
Share,
Stream,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum TransferMode {
Direct,
RelayOnly,
Mailbox,
Swarm,
SparseImage,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObjectGraphSummary {
pub object_count: u64,
pub total_bytes: u64,
pub file_count: u64,
pub directory_count: u64,
pub largest_file_bytes: u64,
pub small_files_count: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkingProfile {
pub chunk_size: u32,
pub estimated_chunks: u64,
pub repair_overhead_ratio: f64,
pub compression_ratio: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PathCandidate {
pub path_type: String,
pub estimated_rtt: Duration,
pub estimated_bandwidth: u64,
pub reliability_score: f64,
pub preferred: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiskAllocationPlan {
pub destination_path: PathBuf,
pub required_space: u64,
pub available_space: u64,
pub preallocation_strategy: String,
pub temp_space_needed: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceGovernanceProfile {
pub max_connections: u32,
pub bandwidth_limit: Option<u64>,
pub memory_limit: Option<u64>,
pub cpu_limit: Option<f32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResumeState {
pub resume_available: bool,
pub resume_token: Option<String>,
pub bytes_completed: u64,
pub chunks_completed: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheAnalysis {
pub local_hit_ratio: f64,
pub remote_hit_ratio: f64,
pub bytes_from_cache: u64,
pub cache_locations: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanUncertainty {
pub bandwidth_confidence: f64,
pub path_confidence: f64,
pub peer_confidence: f64,
pub resource_confidence: f64,
pub uncertainty_factors: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AtpTransferPlan {
pub schema_version: String,
pub plan_id: String,
pub created_at: SystemTime,
pub transfer_type: TransferType,
pub transfer_mode: TransferMode,
pub object_graph: ObjectGraphSummary,
pub chunking_profile: ChunkingProfile,
pub estimated_bytes_on_wire: u64,
pub path_candidates: Vec<PathCandidate>,
pub disk_allocation: DiskAllocationPlan,
pub governance_profile: ResourceGovernanceProfile,
pub resume_state: ResumeState,
pub cache_analysis: CacheAnalysis,
pub proof_outputs: HashMap<String, String>,
pub uncertainty: PlanUncertainty,
pub estimated_duration: Duration,
pub estimated_completion: SystemTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanDeviation {
pub timestamp: SystemTime,
pub deviation_type: String,
pub expected_value: String,
pub actual_value: String,
pub reason: String,
pub impact_severity: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanExecutionReport {
pub schema_version: String,
pub plan_id: String,
pub started_at: SystemTime,
pub completed_at: Option<SystemTime>,
pub deviations: Vec<PlanDeviation>,
pub final_stats: HashMap<String, serde_json::Value>,
pub success: bool,
pub error_message: Option<String>,
}
#[derive(Debug)]
pub struct AtpTransferPlanner {
config: PlannerConfig,
}
#[derive(Debug, Clone)]
pub struct PlannerConfig {
pub default_chunk_size: u32,
pub default_repair_overhead: f64,
pub default_bandwidth_bps: u64,
pub planning_timeout: Duration,
}
impl Default for PlannerConfig {
fn default() -> Self {
Self {
default_chunk_size: 64 * 1024, default_repair_overhead: 0.1, default_bandwidth_bps: 10 * 1024 * 1024, planning_timeout: Duration::from_secs(30),
}
}
}
fn analyze_directory_tree(source_path: &Path) -> Result<ObjectGraphSummary, PlannerError> {
let mut summary = ObjectGraphSummary {
object_count: 0,
total_bytes: 0,
file_count: 0,
directory_count: 0,
largest_file_bytes: 0,
small_files_count: 0,
};
let mut stack = vec![source_path.to_path_buf()];
while let Some(path) = stack.pop() {
let metadata = std::fs::symlink_metadata(&path).map_err(|error| {
PlannerError::InvalidInput(format!(
"Cannot read source metadata for {}: {error}",
path.display()
))
})?;
summary.object_count = summary.object_count.saturating_add(1);
if metadata.is_dir() {
summary.directory_count = summary.directory_count.saturating_add(1);
for entry in std::fs::read_dir(&path).map_err(|error| {
PlannerError::InvalidInput(format!(
"Cannot read source directory {}: {error}",
path.display()
))
})? {
let entry = entry.map_err(|error| {
PlannerError::InvalidInput(format!(
"Cannot read source directory entry in {}: {error}",
path.display()
))
})?;
stack.push(entry.path());
}
} else {
let len = metadata.len();
summary.file_count = summary.file_count.saturating_add(1);
summary.total_bytes = summary.total_bytes.saturating_add(len);
summary.largest_file_bytes = summary.largest_file_bytes.max(len);
if len < 1024 * 1024 {
summary.small_files_count = summary.small_files_count.saturating_add(1);
}
}
}
Ok(summary)
}
fn available_space_bytes(path: &Path) -> Result<u64, PlannerError> {
#[cfg(unix)]
{
let stats = nix::sys::statvfs::statvfs(path).map_err(|error| {
PlannerError::PlanningFailed(format!(
"Cannot inspect available disk space for {}: {error}",
path.display()
))
})?;
let available =
u128::from(stats.blocks_available()).saturating_mul(u128::from(stats.fragment_size()));
Ok(available.min(u128::from(u64::MAX)) as u64)
}
#[cfg(not(unix))]
{
let _ = path;
Err(PlannerError::PlanningFailed(
"disk space probing is not supported on this target".to_string(),
))
}
}
fn deterministic_local_cache_ratio(object_graph: &ObjectGraphSummary) -> f64 {
if object_graph.total_bytes == 0 {
return 0.0;
}
let small_file_ratio = if object_graph.file_count == 0 {
0.0
} else {
object_graph.small_files_count as f64 / object_graph.file_count as f64
};
let directory_bonus = if object_graph.directory_count > 0 {
0.05
} else {
0.0
};
(0.10 + small_file_ratio * 0.25 + directory_bonus).min(0.60)
}
fn deterministic_remote_cache_ratio(object_graph: &ObjectGraphSummary) -> f64 {
if object_graph.total_bytes == 0 {
return 0.0;
}
let graph_scale = (object_graph.object_count as f64).log2().max(0.0) / 20.0;
(0.05 + graph_scale).min(0.40)
}
fn cache_locations_for_ratios(local_hit_ratio: f64, remote_hit_ratio: f64) -> Vec<String> {
let mut locations = Vec::new();
if local_hit_ratio > 0.0 {
locations.push("local".to_string());
}
if remote_hit_ratio > 0.0 {
locations.push("relay".to_string());
}
locations
}
fn completed_bytes_for_path(path: &Path) -> Result<u64, PlannerError> {
let metadata = std::fs::symlink_metadata(path).map_err(|error| {
PlannerError::InvalidInput(format!(
"Cannot read resume metadata for {}: {error}",
path.display()
))
})?;
if metadata.is_dir() {
Ok(analyze_directory_tree(path)?.total_bytes)
} else {
Ok(metadata.len())
}
}
impl AtpTransferPlanner {
pub fn new(config: PlannerConfig) -> Self {
Self { config }
}
pub fn new_default() -> Self {
Self::new(PlannerConfig::default())
}
pub async fn plan_transfer(
&self,
cx: &Cx,
transfer_type: TransferType,
source_path: &PathBuf,
destination_path: &PathBuf,
options: PlannerOptions,
) -> Result<AtpTransferPlan, PlannerError> {
let plan_id = self.generate_plan_id();
let created_at = SystemTime::now();
let object_graph = self.analyze_object_graph(cx, source_path).await?;
let chunking_profile = self.generate_chunking_profile(&object_graph);
let disk_allocation = self
.analyze_disk_allocation(destination_path, &object_graph)
.await?;
let path_candidates = self.generate_path_candidates(&options).await?;
let cache_analysis = self
.analyze_cache_opportunities(&object_graph, &options)
.await?;
let resume_state = self.check_resume_state(destination_path, &options).await?;
let governance_profile = self.generate_governance_profile(&options);
let uncertainty = self.calculate_uncertainty(&path_candidates, &options);
let estimated_bytes_on_wire = self.estimate_bytes_on_wire(&object_graph, &chunking_profile);
let estimated_duration = self.estimate_transfer_duration(
estimated_bytes_on_wire,
&path_candidates,
&cache_analysis,
);
let estimated_completion = created_at + estimated_duration;
Ok(AtpTransferPlan {
schema_version: ATP_TRANSFER_PLAN_SCHEMA.to_string(),
plan_id,
created_at,
transfer_type,
transfer_mode: options.transfer_mode,
object_graph,
chunking_profile,
estimated_bytes_on_wire,
path_candidates,
disk_allocation,
governance_profile,
resume_state,
cache_analysis,
proof_outputs: options.proof_outputs.unwrap_or_default(),
uncertainty,
estimated_duration,
estimated_completion,
})
}
pub async fn validate_plan(
&self,
_cx: &Cx,
plan: &AtpTransferPlan,
) -> Result<Vec<String>, PlannerError> {
let mut warnings = Vec::new();
if plan.disk_allocation.available_space < plan.disk_allocation.required_space {
return Err(PlannerError::InsufficientDiskSpace {
needed: plan.disk_allocation.required_space,
available: plan.disk_allocation.available_space,
});
}
if plan.path_candidates.is_empty() {
return Err(PlannerError::PathNotAvailable(
"No paths available".to_string(),
));
}
if plan.uncertainty.bandwidth_confidence < 0.5 {
warnings.push("Low bandwidth estimation confidence".to_string());
}
if plan.uncertainty.path_confidence < 0.7 {
warnings.push("Low path availability confidence".to_string());
}
Ok(warnings)
}
fn generate_plan_id(&self) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
SystemTime::now().hash(&mut hasher);
std::process::id().hash(&mut hasher);
format!("plan_{:016x}", hasher.finish())
}
async fn analyze_object_graph(
&self,
_cx: &Cx,
source_path: &PathBuf,
) -> Result<ObjectGraphSummary, PlannerError> {
if !source_path.exists() {
return Err(PlannerError::InvalidInput(format!(
"Source path does not exist: {}",
source_path.display()
)));
}
let metadata = crate::fs::metadata(source_path).await.map_err(|e| {
PlannerError::InvalidInput(format!("Cannot read source metadata: {}", e))
})?;
if metadata.is_file() {
Ok(ObjectGraphSummary {
object_count: 1,
total_bytes: metadata.len(),
file_count: 1,
directory_count: 0,
largest_file_bytes: metadata.len(),
small_files_count: u64::from(metadata.len() < 1024 * 1024),
})
} else {
analyze_directory_tree(source_path)
}
}
fn generate_chunking_profile(&self, object_graph: &ObjectGraphSummary) -> ChunkingProfile {
let chunk_size = if object_graph.total_bytes < 512 * 1024 {
32 * 1024
} else {
self.config.default_chunk_size
};
let estimated_chunks =
(object_graph.total_bytes + chunk_size as u64 - 1) / chunk_size as u64;
ChunkingProfile {
chunk_size,
estimated_chunks,
repair_overhead_ratio: self.config.default_repair_overhead,
compression_ratio: 0.8, }
}
async fn analyze_disk_allocation(
&self,
destination_path: &Path,
object_graph: &ObjectGraphSummary,
) -> Result<DiskAllocationPlan, PlannerError> {
let parent_dir = destination_path
.parent()
.ok_or_else(|| PlannerError::InvalidInput("Invalid destination path".to_string()))?;
let available_space = available_space_bytes(parent_dir)?;
let required_space = object_graph.total_bytes;
let temp_space_needed = required_space / 4;
Ok(DiskAllocationPlan {
destination_path: destination_path.to_path_buf(),
required_space,
available_space,
preallocation_strategy: "sparse".to_string(),
temp_space_needed,
})
}
async fn generate_path_candidates(
&self,
options: &PlannerOptions,
) -> Result<Vec<PathCandidate>, PlannerError> {
let mut candidates = Vec::new();
match options.transfer_mode {
TransferMode::Direct => {
candidates.push(PathCandidate {
path_type: "direct".to_string(),
estimated_rtt: Duration::from_millis(10),
estimated_bandwidth: self.config.default_bandwidth_bps,
reliability_score: 0.9,
preferred: true,
});
}
TransferMode::RelayOnly => {
candidates.push(PathCandidate {
path_type: "relay".to_string(),
estimated_rtt: Duration::from_millis(50),
estimated_bandwidth: self.config.default_bandwidth_bps / 2,
reliability_score: 0.95,
preferred: true,
});
}
TransferMode::Mailbox => {
candidates.push(PathCandidate {
path_type: "mailbox".to_string(),
estimated_rtt: Duration::from_millis(100),
estimated_bandwidth: self.config.default_bandwidth_bps / 4,
reliability_score: 0.99,
preferred: true,
});
}
TransferMode::Swarm => {
for i in 0..3 {
candidates.push(PathCandidate {
path_type: format!("swarm_peer_{}", i),
estimated_rtt: Duration::from_millis(20 + i * 10),
estimated_bandwidth: self.config.default_bandwidth_bps / 3,
reliability_score: 0.8,
preferred: i == 0,
});
}
}
TransferMode::SparseImage => {
candidates.push(PathCandidate {
path_type: "direct_sparse".to_string(),
estimated_rtt: Duration::from_millis(15),
estimated_bandwidth: self.config.default_bandwidth_bps,
reliability_score: 0.85,
preferred: true,
});
}
}
Ok(candidates)
}
async fn analyze_cache_opportunities(
&self,
object_graph: &ObjectGraphSummary,
options: &PlannerOptions,
) -> Result<CacheAnalysis, PlannerError> {
let local_hit_ratio = if options.cache_enabled {
deterministic_local_cache_ratio(object_graph)
} else {
0.0
};
let remote_hit_ratio = if options.cache_enabled {
deterministic_remote_cache_ratio(object_graph)
} else {
0.0
};
let bytes_from_cache = (object_graph.total_bytes as f64 * local_hit_ratio) as u64;
let cache_locations = if options.cache_enabled {
cache_locations_for_ratios(local_hit_ratio, remote_hit_ratio)
} else {
vec![]
};
Ok(CacheAnalysis {
local_hit_ratio,
remote_hit_ratio,
bytes_from_cache,
cache_locations,
})
}
async fn check_resume_state(
&self,
destination_path: &PathBuf,
options: &PlannerOptions,
) -> Result<ResumeState, PlannerError> {
let resume_available = options.allow_resume && destination_path.exists();
let (bytes_completed, chunks_completed) = if resume_available {
let completed = completed_bytes_for_path(destination_path)?;
let chunks = completed.div_ceil(u64::from(self.config.default_chunk_size.max(1)));
(completed, chunks)
} else {
(0, 0)
};
Ok(ResumeState {
resume_available,
resume_token: if resume_available {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
SystemTime::now().hash(&mut hasher);
destination_path.hash(&mut hasher);
Some(format!("resume_{:x}", hasher.finish() as u32))
} else {
None
},
bytes_completed,
chunks_completed,
})
}
fn generate_governance_profile(&self, options: &PlannerOptions) -> ResourceGovernanceProfile {
ResourceGovernanceProfile {
max_connections: options.max_connections.unwrap_or(4),
bandwidth_limit: options.bandwidth_limit,
memory_limit: options.memory_limit,
cpu_limit: options.cpu_limit,
}
}
fn calculate_uncertainty(
&self,
path_candidates: &[PathCandidate],
options: &PlannerOptions,
) -> PlanUncertainty {
let bandwidth_confidence = path_candidates
.iter()
.map(|p| p.reliability_score)
.fold(0.0, f64::max);
let path_confidence = if path_candidates.len() > 1 { 0.9 } else { 0.7 };
let peer_confidence = match options.transfer_mode {
TransferMode::Direct => 0.8,
TransferMode::RelayOnly => 0.95,
TransferMode::Mailbox => 0.99,
TransferMode::Swarm => 0.7,
TransferMode::SparseImage => 0.8,
};
let resource_confidence = if options.bandwidth_limit.is_some() {
0.9
} else {
0.7
};
let uncertainty_factors = vec![
"Network conditions may change".to_string(),
"Peer availability not guaranteed".to_string(),
"Bandwidth estimation based on limited samples".to_string(),
];
PlanUncertainty {
bandwidth_confidence,
path_confidence,
peer_confidence,
resource_confidence,
uncertainty_factors,
}
}
fn estimate_bytes_on_wire(
&self,
object_graph: &ObjectGraphSummary,
chunking_profile: &ChunkingProfile,
) -> u64 {
let compressed_bytes =
(object_graph.total_bytes as f64 * chunking_profile.compression_ratio).floor() as u64;
let repair_bytes =
(compressed_bytes as f64 * chunking_profile.repair_overhead_ratio).floor() as u64;
let protocol_overhead = compressed_bytes / 100;
compressed_bytes + repair_bytes + protocol_overhead
}
fn estimate_transfer_duration(
&self,
bytes_on_wire: u64,
path_candidates: &[PathCandidate],
cache_analysis: &CacheAnalysis,
) -> Duration {
let best_bandwidth = path_candidates
.iter()
.map(|p| p.estimated_bandwidth)
.max()
.unwrap_or(self.config.default_bandwidth_bps);
let effective_bytes = bytes_on_wire.saturating_sub(cache_analysis.bytes_from_cache);
let transfer_seconds = effective_bytes as f64 / best_bandwidth as f64;
let setup_overhead = Duration::from_secs(5);
Duration::from_secs(transfer_seconds as u64) + setup_overhead
}
pub fn create_execution_tracker(&self, plan_id: String) -> PlanExecutionTracker {
PlanExecutionTracker::new(plan_id)
}
}
#[derive(Debug, Clone)]
pub struct PlannerOptions {
pub transfer_mode: TransferMode,
pub cache_enabled: bool,
pub allow_resume: bool,
pub max_connections: Option<u32>,
pub bandwidth_limit: Option<u64>,
pub memory_limit: Option<u64>,
pub cpu_limit: Option<f32>,
pub proof_outputs: Option<HashMap<String, String>>,
}
impl Default for PlannerOptions {
fn default() -> Self {
Self {
transfer_mode: TransferMode::Direct,
cache_enabled: true,
allow_resume: true,
max_connections: None,
bandwidth_limit: None,
memory_limit: None,
cpu_limit: None,
proof_outputs: None,
}
}
}
#[derive(Debug)]
pub struct PlanExecutionTracker {
plan_id: String,
started_at: SystemTime,
deviations: Vec<PlanDeviation>,
}
impl PlanExecutionTracker {
pub fn new(plan_id: String) -> Self {
Self {
plan_id,
started_at: SystemTime::now(),
deviations: Vec::new(),
}
}
pub fn record_deviation(
&mut self,
deviation_type: String,
expected_value: String,
actual_value: String,
reason: String,
impact_severity: String,
) {
let deviation = PlanDeviation {
timestamp: SystemTime::now(),
deviation_type,
expected_value,
actual_value,
reason,
impact_severity,
};
self.deviations.push(deviation);
}
pub fn generate_report(
self,
success: bool,
error_message: Option<String>,
final_stats: HashMap<String, serde_json::Value>,
) -> PlanExecutionReport {
PlanExecutionReport {
schema_version: ATP_PLAN_EXECUTION_REPORT_SCHEMA.to_string(),
plan_id: self.plan_id,
started_at: self.started_at,
completed_at: Some(SystemTime::now()),
deviations: self.deviations,
final_stats,
success,
error_message,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_lite::future::block_on;
use std::collections::HashMap;
use tempfile::TempDir;
#[test]
fn test_planner_creation() {
let planner = AtpTransferPlanner::new_default();
assert_eq!(planner.config.default_chunk_size, 64 * 1024);
}
#[test]
fn test_plan_id_generation() {
let planner = AtpTransferPlanner::new_default();
let id1 = planner.generate_plan_id();
let id2 = planner.generate_plan_id();
assert_ne!(id1, id2);
assert!(id1.starts_with("plan_"));
assert!(id2.starts_with("plan_"));
}
#[test]
fn test_object_graph_analysis_file() {
block_on(async {
let cx = Cx::for_testing();
let planner = AtpTransferPlanner::new_default();
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("test.txt");
std::fs::write(&test_file, b"test content").unwrap();
let result = planner.analyze_object_graph(&cx, &test_file).await.unwrap();
assert_eq!(result.object_count, 1);
assert_eq!(result.file_count, 1);
assert_eq!(result.directory_count, 0);
assert_eq!(result.total_bytes, 12); });
}
#[test]
fn test_planning_options_default() {
let options = PlannerOptions::default();
assert_eq!(options.transfer_mode, TransferMode::Direct);
assert!(options.cache_enabled);
assert!(options.allow_resume);
}
#[test]
fn test_chunking_profile_generation() {
let planner = AtpTransferPlanner::new_default();
let object_graph = ObjectGraphSummary {
object_count: 1,
total_bytes: 1024 * 1024, file_count: 1,
directory_count: 0,
largest_file_bytes: 1024 * 1024,
small_files_count: 0,
};
let profile = planner.generate_chunking_profile(&object_graph);
assert_eq!(profile.chunk_size, 64 * 1024);
assert_eq!(profile.estimated_chunks, 16); assert_eq!(profile.repair_overhead_ratio, 0.1);
let non_aligned_graph = ObjectGraphSummary {
total_bytes: (1024 * 1024) + 1,
..object_graph
};
let non_aligned_profile = planner.generate_chunking_profile(&non_aligned_graph);
assert_eq!(non_aligned_profile.estimated_chunks, 17);
}
#[test]
fn test_path_candidates_generation() {
block_on(async {
let planner = AtpTransferPlanner::new_default();
let options = PlannerOptions {
transfer_mode: TransferMode::Direct,
..Default::default()
};
let candidates = planner.generate_path_candidates(&options).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].path_type, "direct");
assert!(candidates[0].preferred);
});
}
#[test]
fn test_swarm_path_candidates() {
block_on(async {
let planner = AtpTransferPlanner::new_default();
let options = PlannerOptions {
transfer_mode: TransferMode::Swarm,
..Default::default()
};
let candidates = planner.generate_path_candidates(&options).await.unwrap();
assert_eq!(candidates.len(), 3);
assert!(candidates[0].preferred);
assert!(!candidates[1].preferred);
assert!(!candidates[2].preferred);
});
}
#[test]
fn test_cache_analysis() {
block_on(async {
let planner = AtpTransferPlanner::new_default();
let object_graph = ObjectGraphSummary {
object_count: 1,
total_bytes: 1024 * 1024,
file_count: 1,
directory_count: 0,
largest_file_bytes: 1024 * 1024,
small_files_count: 0,
};
let options = PlannerOptions {
cache_enabled: true,
..Default::default()
};
let cache_analysis = planner
.analyze_cache_opportunities(&object_graph, &options)
.await
.unwrap();
assert!(cache_analysis.local_hit_ratio > 0.0);
assert!(cache_analysis.bytes_from_cache > 0);
assert!(!cache_analysis.cache_locations.is_empty());
});
}
#[test]
fn test_bytes_on_wire_calculation() {
let planner = AtpTransferPlanner::new_default();
let object_graph = ObjectGraphSummary {
object_count: 1,
total_bytes: 1024 * 1024, file_count: 1,
directory_count: 0,
largest_file_bytes: 1024 * 1024,
small_files_count: 0,
};
let chunking_profile = ChunkingProfile {
chunk_size: 64 * 1024,
estimated_chunks: 16,
repair_overhead_ratio: 0.1,
compression_ratio: 0.8,
};
let bytes_on_wire = planner.estimate_bytes_on_wire(&object_graph, &chunking_profile);
assert_eq!(bytes_on_wire, 931134);
}
#[test]
fn test_execution_tracker() {
let mut tracker = PlanExecutionTracker::new("test_plan".to_string());
tracker.record_deviation(
"bandwidth".to_string(),
"10Mbps".to_string(),
"5Mbps".to_string(),
"network congestion".to_string(),
"medium".to_string(),
);
assert_eq!(tracker.deviations.len(), 1);
assert_eq!(tracker.deviations[0].deviation_type, "bandwidth");
let report = tracker.generate_report(true, None, HashMap::new());
assert_eq!(report.plan_id, "test_plan");
assert!(report.success);
assert!(report.completed_at.is_some());
assert_eq!(report.deviations.len(), 1);
}
}