use std::collections::{HashMap, VecDeque};
use std::time::{Duration, Instant};
use trustformers_core::Result;
use super::types::{
ChecksumAlgorithm, ConflictResolutionStrategy, MergeAlgorithm, NetworkAdaptationConfig,
SyncStrategy,
};
#[derive(Debug, Clone, PartialEq)]
pub enum SyncStatus {
Pending,
InProgress,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone)]
pub struct SyncRequest {
pub sync_id: String,
pub source_version: String,
pub target_version: String,
pub model_data: Vec<u8>,
pub priority: u8,
pub timestamp: Instant,
}
#[derive(Debug, Clone)]
pub struct ModelVersion {
pub version: String,
pub timestamp: Instant,
pub checksum: String,
pub size_bytes: u64,
pub parent_version: Option<String>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct SyncResponse {
pub sync_id: String,
pub status: SyncStatus,
pub timestamp: Instant,
pub result_data: Vec<u8>,
pub error_message: Option<String>,
pub metadata: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct ConflictMetadata {
pub conflict_id: String,
pub source_version: String,
pub target_version: String,
pub conflict_type: String,
pub timestamp: Instant,
pub resolution_strategy: ConflictResolutionStrategy,
pub metadata: HashMap<String, String>,
}
pub struct ModelSyncCoordinator {
config: NetworkAdaptationConfig,
sync_scheduler: SyncScheduler,
version_manager: ModelVersionManager,
conflict_resolver: ConflictResolver,
integrity_checker: IntegrityChecker,
}
pub struct SyncScheduler {
sync_strategy: SyncStrategy,
sync_queue: VecDeque<SyncRequest>,
active_syncs: HashMap<String, SyncStatus>,
}
pub struct ModelVersionManager {
current_version: String,
version_history: VecDeque<ModelVersion>,
conflict_detector: ConflictDetector,
}
pub struct ConflictDetector {
known_versions: HashMap<String, ModelVersion>,
conflict_resolution_strategy: ConflictResolutionStrategy,
}
pub struct ConflictResolver {
resolution_strategy: ConflictResolutionStrategy,
merge_algorithm: MergeAlgorithm,
conflict_history: VecDeque<ConflictMetadata>,
}
pub struct IntegrityChecker {
checksum_algorithm: ChecksumAlgorithm,
verification_cache: VerificationCache,
integrity_failures: VecDeque<(Instant, String)>,
}
pub struct VerificationCache {
cached_checksums: HashMap<String, String>,
cache_expiry: Duration,
hit_rate: f32,
}
impl ModelSyncCoordinator {
pub fn new(config: NetworkAdaptationConfig) -> Result<Self> {
Ok(Self {
config,
sync_scheduler: SyncScheduler::new(),
version_manager: ModelVersionManager::new(),
conflict_resolver: ConflictResolver::new(),
integrity_checker: IntegrityChecker::new(),
})
}
pub fn start(&mut self) -> Result<()> {
Ok(())
}
pub fn stop(&mut self) -> Result<()> {
self.sync_scheduler.cancel_all_syncs();
Ok(())
}
pub fn schedule_sync(&mut self, sync_request: SyncRequest) -> Result<String> {
if !self.validate_sync_request(&sync_request)? {
return Err(anyhow::anyhow!("Invalid sync request").into());
}
if let Some(conflict) = self.version_manager.detect_conflict(&sync_request)? {
let resolution = self.conflict_resolver.resolve_conflict(&conflict)?;
return Ok(format!("conflict_resolved:{}", resolution.resolution_id));
}
let sync_id = self.sync_scheduler.schedule_sync(sync_request)?;
Ok(sync_id)
}
pub fn execute_pending_syncs(&mut self) -> Result<Vec<SyncResponse>> {
let mut responses = Vec::new();
while let Some(sync_request) = self.sync_scheduler.get_next_sync() {
match self.execute_sync(&sync_request) {
Ok(response) => {
responses.push(response);
self.sync_scheduler.mark_sync_completed(&sync_request.sync_id);
},
Err(e) => {
self.sync_scheduler.mark_sync_failed(&sync_request.sync_id, &e.to_string());
},
}
}
Ok(responses)
}
fn execute_sync(&mut self, sync_request: &SyncRequest) -> Result<SyncResponse> {
if !self.integrity_checker.verify_integrity(&sync_request.model_data)? {
return Err(anyhow::anyhow!("Model integrity verification failed").into());
}
let merged_model = self.apply_sync(sync_request)?;
let new_version = self.version_manager.create_new_version(&merged_model)?;
Ok(SyncResponse {
sync_id: sync_request.sync_id.clone(),
status: SyncStatus::Completed,
timestamp: Instant::now(),
result_data: merged_model, error_message: None,
metadata: {
let mut metadata = HashMap::new();
metadata.insert("version".to_string(), new_version.clone());
metadata
},
})
}
fn apply_sync(&mut self, sync_request: &SyncRequest) -> Result<Vec<u8>> {
let strategy = if sync_request.priority >= 8 {
SyncStrategy::Immediate } else if sync_request.model_data.len() > 10_000_000 {
SyncStrategy::Batched } else {
SyncStrategy::Adaptive };
match strategy {
SyncStrategy::Immediate => {
Ok(sync_request.model_data.clone())
},
SyncStrategy::Batched => {
self.apply_incremental_sync(sync_request)
},
SyncStrategy::Adaptive => {
if sync_request.model_data.len() > 10_000_000 {
self.apply_incremental_sync(sync_request)
} else {
Ok(sync_request.model_data.clone())
}
},
SyncStrategy::Scheduled => {
Ok(sync_request.model_data.clone())
},
SyncStrategy::Opportunistic => {
Ok(sync_request.model_data.clone())
},
}
}
fn apply_incremental_sync(&mut self, sync_request: &SyncRequest) -> Result<Vec<u8>> {
let current_model = self.version_manager.get_current_model()?;
let mut updated_model = current_model;
for (i, &byte) in sync_request.model_data.iter().enumerate() {
if i < updated_model.len() {
updated_model[i] = byte;
} else {
updated_model.push(byte);
}
}
Ok(updated_model)
}
fn validate_sync_request(&self, sync_request: &SyncRequest) -> Result<bool> {
if sync_request.model_data.is_empty() {
return Ok(false);
}
if sync_request.source_version.is_empty() {
return Ok(false);
}
Ok(true)
}
pub fn get_sync_status(&self, sync_id: &str) -> Option<&SyncStatus> {
self.sync_scheduler.get_sync_status(sync_id)
}
pub fn get_conflict_history(&self) -> &VecDeque<ConflictMetadata> {
&self.conflict_resolver.conflict_history
}
pub fn get_integrity_stats(&self) -> (usize, f32) {
let failure_count = self.integrity_checker.integrity_failures.len();
let cache_hit_rate = self.integrity_checker.verification_cache.hit_rate;
(failure_count, cache_hit_rate)
}
pub fn update_config(&mut self, config: NetworkAdaptationConfig) -> Result<()> {
self.config = config;
Ok(())
}
pub fn force_sync(&mut self, model_data: Vec<u8>, reason: String) -> Result<String> {
let sync_request = SyncRequest {
sync_id: format!("force_sync_{}", Instant::now().elapsed().as_millis()),
source_version: self.version_manager.current_version.clone(),
target_version: format!("{}_forced", self.version_manager.current_version),
model_data,
priority: 10, timestamp: Instant::now(),
};
self.schedule_sync(sync_request)
}
}
impl SyncScheduler {
pub fn new() -> Self {
Self {
sync_strategy: SyncStrategy::Adaptive,
sync_queue: VecDeque::new(),
active_syncs: HashMap::new(),
}
}
pub fn schedule_sync(&mut self, sync_request: SyncRequest) -> Result<String> {
let sync_id = sync_request.sync_id.clone();
self.insert_by_priority(sync_request);
self.active_syncs.insert(sync_id.clone(), SyncStatus::Pending);
Ok(sync_id)
}
fn insert_by_priority(&mut self, sync_request: SyncRequest) {
let priority = sync_request.priority;
let insert_pos = self
.sync_queue
.iter()
.position(|req| req.priority < priority)
.unwrap_or(self.sync_queue.len());
self.sync_queue.insert(insert_pos, sync_request);
}
pub fn get_next_sync(&mut self) -> Option<SyncRequest> {
if let Some(sync_request) = self.sync_queue.pop_front() {
self.active_syncs.insert(sync_request.sync_id.clone(), SyncStatus::InProgress);
Some(sync_request)
} else {
None
}
}
pub fn mark_sync_completed(&mut self, sync_id: &str) {
self.active_syncs.insert(sync_id.to_string(), SyncStatus::Completed);
}
pub fn mark_sync_failed(&mut self, sync_id: &str, error: &str) {
self.active_syncs.insert(sync_id.to_string(), SyncStatus::Failed);
}
pub fn get_sync_status(&self, sync_id: &str) -> Option<&SyncStatus> {
self.active_syncs.get(sync_id)
}
pub fn cancel_all_syncs(&mut self) {
self.sync_queue.clear();
for (sync_id, status) in &mut self.active_syncs {
if matches!(status, SyncStatus::Pending | SyncStatus::InProgress) {
*status = SyncStatus::Cancelled;
}
}
}
pub fn get_queue_length(&self) -> usize {
self.sync_queue.len()
}
pub fn get_active_syncs_count(&self) -> usize {
self.active_syncs
.values()
.filter(|status| matches!(status, SyncStatus::InProgress))
.count()
}
}
impl ModelVersionManager {
pub fn new() -> Self {
Self {
current_version: "0.1.0".to_string(),
version_history: VecDeque::new(),
conflict_detector: ConflictDetector::new(),
}
}
pub fn create_new_version(&mut self, model_data: &[u8]) -> Result<String> {
let new_version = self.generate_next_version();
let version_record = ModelVersion {
version: new_version.clone(),
timestamp: Instant::now(),
checksum: self.calculate_checksum(model_data),
size_bytes: model_data.len() as u64,
parent_version: Some(self.current_version.clone()),
metadata: HashMap::new(),
};
self.version_history.push_back(version_record);
if self.version_history.len() > 100 {
self.version_history.pop_front();
}
self.current_version = new_version.clone();
Ok(new_version)
}
fn generate_next_version(&self) -> String {
let parts: Vec<&str> = self.current_version.split('.').collect();
if parts.len() == 3 {
if let (Ok(major), Ok(minor), Ok(patch)) = (
parts[0].parse::<u32>(),
parts[1].parse::<u32>(),
parts[2].parse::<u32>(),
) {
return format!("{}.{}.{}", major, minor, patch + 1);
}
}
format!("1.0.{}", Instant::now().elapsed().as_secs())
}
fn calculate_checksum(&self, model_data: &[u8]) -> String {
let sum: u64 = model_data.iter().map(|&b| b as u64).sum();
format!("{:016x}", sum)
}
pub fn detect_conflict(
&mut self,
sync_request: &SyncRequest,
) -> Result<Option<ConflictMetadata>> {
self.conflict_detector.detect_conflict(sync_request, &self.version_history)
}
pub fn get_current_model(&self) -> Result<Vec<u8>> {
Ok(vec![0u8; 1024]) }
pub fn get_version_history(&self) -> &VecDeque<ModelVersion> {
&self.version_history
}
pub fn rollback_to_version(&mut self, target_version: &str) -> Result<()> {
if let Some(version_record) =
self.version_history.iter().find(|v| v.version == target_version)
{
self.current_version = version_record.version.clone();
Ok(())
} else {
Err(anyhow::anyhow!("Version {} not found", target_version).into())
}
}
}
impl ConflictDetector {
pub fn new() -> Self {
Self {
known_versions: HashMap::new(),
conflict_resolution_strategy: ConflictResolutionStrategy::LastWriterWins,
}
}
pub fn detect_conflict(
&mut self,
sync_request: &SyncRequest,
version_history: &VecDeque<ModelVersion>,
) -> Result<Option<ConflictMetadata>> {
let source_exists =
version_history.iter().any(|v| v.version == sync_request.source_version);
if !source_exists {
let conflict = ConflictMetadata {
conflict_id: format!("conflict_{}", Instant::now().elapsed().as_millis()),
source_version: sync_request.source_version.clone(),
target_version: sync_request.target_version.clone(),
conflict_type: "version_mismatch".to_string(),
timestamp: Instant::now(),
resolution_strategy: self.conflict_resolution_strategy,
metadata: HashMap::new(),
};
return Ok(Some(conflict));
}
if self.has_concurrent_modifications(sync_request, version_history)? {
let conflict = ConflictMetadata {
conflict_id: format!("concurrent_{}", Instant::now().elapsed().as_millis()),
source_version: sync_request.source_version.clone(),
target_version: sync_request.target_version.clone(),
conflict_type: "concurrent_modification".to_string(),
timestamp: Instant::now(),
resolution_strategy: self.conflict_resolution_strategy,
metadata: HashMap::new(),
};
return Ok(Some(conflict));
}
Ok(None)
}
fn has_concurrent_modifications(
&self,
sync_request: &SyncRequest,
version_history: &VecDeque<ModelVersion>,
) -> Result<bool> {
if let Some(source_version_record) =
version_history.iter().find(|v| v.version == sync_request.source_version)
{
let concurrent_count = version_history
.iter()
.filter(|v| v.timestamp > source_version_record.timestamp)
.count();
return Ok(concurrent_count > 0);
}
Ok(false)
}
}
impl ConflictResolver {
pub fn new() -> Self {
Self {
resolution_strategy: ConflictResolutionStrategy::LastWriterWins,
merge_algorithm: MergeAlgorithm::WeightedMerge,
conflict_history: VecDeque::new(),
}
}
pub fn resolve_conflict(&mut self, conflict: &ConflictMetadata) -> Result<ConflictResolution> {
let resolution = match &self.resolution_strategy {
ConflictResolutionStrategy::LastWriterWins => {
self.resolve_last_writer_wins(conflict)?
},
ConflictResolutionStrategy::ServerDecision => {
self.resolve_first_writer_wins(conflict)?
},
ConflictResolutionStrategy::MergeConflicts => self.resolve_merge_strategy(conflict)?,
ConflictResolutionStrategy::UserDecision => self.resolve_user_intervention(conflict)?,
ConflictResolutionStrategy::VersionVector => {
self.resolve_last_writer_wins(conflict)?
},
};
self.conflict_history.push_back(conflict.clone());
if self.conflict_history.len() > 50 {
self.conflict_history.pop_front();
}
Ok(resolution)
}
fn resolve_last_writer_wins(&self, conflict: &ConflictMetadata) -> Result<ConflictResolution> {
Ok(ConflictResolution {
resolution_id: format!("lww_{}", conflict.conflict_id),
resolution_strategy: ConflictResolutionStrategy::LastWriterWins,
resolved_version: conflict.target_version.clone(),
timestamp: Instant::now(),
metadata: HashMap::new(),
})
}
fn resolve_first_writer_wins(&self, conflict: &ConflictMetadata) -> Result<ConflictResolution> {
Ok(ConflictResolution {
resolution_id: format!("fww_{}", conflict.conflict_id),
resolution_strategy: ConflictResolutionStrategy::ServerDecision,
resolved_version: conflict.source_version.clone(),
timestamp: Instant::now(),
metadata: HashMap::new(),
})
}
fn resolve_merge_strategy(&self, conflict: &ConflictMetadata) -> Result<ConflictResolution> {
let merged_version = format!(
"merged_{}_{}",
conflict.source_version, conflict.target_version
);
Ok(ConflictResolution {
resolution_id: format!("merge_{}", conflict.conflict_id),
resolution_strategy: ConflictResolutionStrategy::MergeConflicts,
resolved_version: merged_version,
timestamp: Instant::now(),
metadata: HashMap::new(),
})
}
fn resolve_user_intervention(&self, conflict: &ConflictMetadata) -> Result<ConflictResolution> {
Ok(ConflictResolution {
resolution_id: format!("user_{}", conflict.conflict_id),
resolution_strategy: ConflictResolutionStrategy::UserDecision,
resolved_version: "pending_user_input".to_string(),
timestamp: Instant::now(),
metadata: [("requires_user_input".to_string(), "true".to_string())]
.iter()
.cloned()
.collect(),
})
}
pub fn get_resolution_stats(&self) -> HashMap<String, u32> {
let mut stats = HashMap::new();
for conflict in &self.conflict_history {
let strategy_name = format!("{:?}", conflict.resolution_strategy);
*stats.entry(strategy_name).or_insert(0) += 1;
}
stats
}
}
impl IntegrityChecker {
pub fn new() -> Self {
Self {
checksum_algorithm: ChecksumAlgorithm::SHA256,
verification_cache: VerificationCache::new(),
integrity_failures: VecDeque::new(),
}
}
pub fn verify_integrity(&mut self, model_data: &[u8]) -> Result<bool> {
let model_hash = self.calculate_hash(model_data);
if let Some(cached_result) = self.verification_cache.get(&model_hash) {
return Ok(cached_result);
}
let is_valid = self.perform_integrity_check(model_data)?;
self.verification_cache.insert(model_hash, is_valid);
if !is_valid {
self.integrity_failures
.push_back((Instant::now(), "integrity_check_failed".to_string()));
if self.integrity_failures.len() > 100 {
self.integrity_failures.pop_front();
}
}
Ok(is_valid)
}
fn calculate_hash(&self, model_data: &[u8]) -> String {
match self.checksum_algorithm {
ChecksumAlgorithm::SHA256 => {
let sum: u64 = model_data.iter().map(|&b| b as u64).sum();
format!("{:016x}", sum)
},
ChecksumAlgorithm::MD5 => {
let sum: u32 = model_data.iter().map(|&b| b as u32).sum();
format!("{:08x}", sum)
},
ChecksumAlgorithm::CRC32 => {
let sum: u32 = model_data.iter().map(|&b| b as u32).sum();
format!("{:08x}", sum ^ 0xFFFFFFFF)
},
ChecksumAlgorithm::Custom => {
let sum: u64 =
model_data.iter().enumerate().map(|(i, &b)| (b as u64) * (i as u64 + 1)).sum();
format!("{:016x}", sum)
},
}
}
fn perform_integrity_check(&self, model_data: &[u8]) -> Result<bool> {
if model_data.is_empty() {
return Ok(false);
}
if model_data.len() > 1_000_000_000 {
return Ok(false);
}
Ok(true)
}
pub fn get_failure_rate(&self) -> f32 {
if self.integrity_failures.is_empty() {
return 0.0;
}
let one_hour_ago = Instant::now() - Duration::from_secs(3600);
let recent_failures = self
.integrity_failures
.iter()
.filter(|(timestamp, _)| *timestamp > one_hour_ago)
.count();
recent_failures as f32 / 100.0 }
}
impl VerificationCache {
pub fn new() -> Self {
Self {
cached_checksums: HashMap::new(),
cache_expiry: Duration::from_secs(3600), hit_rate: 0.0,
}
}
pub fn get(&mut self, key: &str) -> Option<bool> {
if let Some(result) = self.cached_checksums.get(key) {
self.hit_rate = (self.hit_rate * 0.9) + (1.0 * 0.1);
Some(result == "valid")
} else {
self.hit_rate *= 0.9;
None
}
}
pub fn insert(&mut self, key: String, is_valid: bool) {
let value = if is_valid { "valid" } else { "invalid" };
self.cached_checksums.insert(key, value.to_string());
if self.cached_checksums.len() > 1000 {
let keys_to_remove: Vec<String> =
self.cached_checksums.keys().take(100).cloned().collect();
for key in keys_to_remove {
self.cached_checksums.remove(&key);
}
}
}
pub fn get_hit_rate(&self) -> f32 {
self.hit_rate
}
pub fn cleanup_expired(&mut self) {
if self.cached_checksums.len() > 500 {
self.cached_checksums.clear();
}
}
}
#[derive(Debug, Clone)]
pub struct ConflictResolution {
pub resolution_id: String,
pub resolution_strategy: ConflictResolutionStrategy,
pub resolved_version: String,
pub timestamp: Instant,
pub metadata: HashMap<String, String>,
}
impl Default for SyncScheduler {
fn default() -> Self {
Self::new()
}
}
impl Default for ModelVersionManager {
fn default() -> Self {
Self::new()
}
}
impl Default for ConflictDetector {
fn default() -> Self {
Self::new()
}
}
impl Default for ConflictResolver {
fn default() -> Self {
Self::new()
}
}
impl Default for IntegrityChecker {
fn default() -> Self {
Self::new()
}
}
impl Default for VerificationCache {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::super::types::{
ChecksumAlgorithm, ConflictResolutionStrategy, MergeAlgorithm, NetworkAdaptationConfig,
SyncStrategy,
};
use super::*;
use std::time::Instant;
fn lcg_f32(state: &mut u64) -> f32 {
*state = state.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407);
(*state % 1000) as f32 / 1000.0
}
fn make_sync_request(id: &str, version: &str, data: Vec<u8>, priority: u8) -> SyncRequest {
SyncRequest {
sync_id: id.to_string(),
source_version: version.to_string(),
target_version: format!("{}_next", version),
model_data: data,
priority,
timestamp: Instant::now(),
}
}
#[test]
fn test_model_sync_coordinator_new() {
let config = NetworkAdaptationConfig::default();
let coordinator = ModelSyncCoordinator::new(config);
assert!(coordinator.is_ok());
}
#[test]
fn test_model_sync_coordinator_start_stop() {
let config = NetworkAdaptationConfig::default();
let mut coordinator = ModelSyncCoordinator::new(config)
.unwrap_or_else(|_| panic!("coordinator creation failed"));
assert!(coordinator.start().is_ok());
assert!(coordinator.stop().is_ok());
}
#[test]
fn test_schedule_sync_with_valid_request() {
let config = NetworkAdaptationConfig::default();
let mut coordinator = ModelSyncCoordinator::new(config)
.unwrap_or_else(|_| panic!("coordinator creation failed"));
let request = make_sync_request("sync_1", "1.0.0", vec![1u8, 2u8, 3u8], 5);
let result = coordinator.schedule_sync(request);
assert!(result.is_ok());
}
#[test]
fn test_schedule_sync_empty_data_fails() {
let config = NetworkAdaptationConfig::default();
let mut coordinator = ModelSyncCoordinator::new(config)
.unwrap_or_else(|_| panic!("coordinator creation failed"));
let request = make_sync_request("sync_empty", "1.0.0", vec![], 5);
let result = coordinator.schedule_sync(request);
assert!(result.is_err());
}
#[test]
fn test_schedule_sync_empty_source_version_fails() {
let config = NetworkAdaptationConfig::default();
let mut coordinator = ModelSyncCoordinator::new(config)
.unwrap_or_else(|_| panic!("coordinator creation failed"));
let request = SyncRequest {
sync_id: "sync_no_version".to_string(),
source_version: "".to_string(),
target_version: "1.0.1".to_string(),
model_data: vec![1, 2, 3],
priority: 5,
timestamp: Instant::now(),
};
let result = coordinator.schedule_sync(request);
assert!(result.is_err());
}
#[test]
fn test_get_sync_status_unknown_returns_none() {
let config = NetworkAdaptationConfig::default();
let coordinator = ModelSyncCoordinator::new(config)
.unwrap_or_else(|_| panic!("coordinator creation failed"));
assert!(coordinator.get_sync_status("nonexistent").is_none());
}
#[test]
fn test_get_integrity_stats_initial() {
let config = NetworkAdaptationConfig::default();
let coordinator = ModelSyncCoordinator::new(config)
.unwrap_or_else(|_| panic!("coordinator creation failed"));
let (failure_count, hit_rate) = coordinator.get_integrity_stats();
assert_eq!(failure_count, 0);
assert_eq!(hit_rate, 0.0);
}
#[test]
fn test_get_conflict_history_initial_empty() {
let config = NetworkAdaptationConfig::default();
let coordinator = ModelSyncCoordinator::new(config)
.unwrap_or_else(|_| panic!("coordinator creation failed"));
assert!(coordinator.get_conflict_history().is_empty());
}
#[test]
fn test_sync_scheduler_new_queue_empty() {
let scheduler = SyncScheduler::new();
assert_eq!(scheduler.get_queue_length(), 0);
}
#[test]
fn test_sync_scheduler_schedule_and_retrieve() {
let mut scheduler = SyncScheduler::new();
let request = make_sync_request("s1", "1.0.0", vec![1, 2], 5);
let result = scheduler.schedule_sync(request);
assert!(result.is_ok());
assert_eq!(scheduler.get_queue_length(), 1);
}
#[test]
fn test_sync_scheduler_get_next_clears_queue() {
let mut scheduler = SyncScheduler::new();
let request = make_sync_request("s2", "1.0.0", vec![1, 2], 5);
let _ = scheduler.schedule_sync(request);
let next = scheduler.get_next_sync();
assert!(next.is_some());
assert_eq!(scheduler.get_queue_length(), 0);
}
#[test]
fn test_sync_scheduler_mark_completed() {
let mut scheduler = SyncScheduler::new();
let request = make_sync_request("s3", "1.0.0", vec![1, 2], 5);
let _ = scheduler.schedule_sync(request);
let next = scheduler.get_next_sync();
let sync_id = next.map(|r| r.sync_id).unwrap_or_default();
scheduler.mark_sync_completed(&sync_id);
let status = scheduler.get_sync_status(&sync_id);
assert!(matches!(status, Some(SyncStatus::Completed)));
}
#[test]
fn test_sync_scheduler_cancel_all() {
let mut scheduler = SyncScheduler::new();
for i in 0..3 {
let req = make_sync_request(&format!("s{}", i), "1.0.0", vec![1, 2], 5);
let _ = scheduler.schedule_sync(req);
}
scheduler.cancel_all_syncs();
assert_eq!(scheduler.get_queue_length(), 0);
}
#[test]
fn test_sync_scheduler_priority_ordering() {
let mut scheduler = SyncScheduler::new();
let low_req = make_sync_request("low", "1.0.0", vec![1], 1);
let high_req = make_sync_request("high", "1.0.0", vec![1], 9);
let _ = scheduler.schedule_sync(low_req);
let _ = scheduler.schedule_sync(high_req);
let next = scheduler.get_next_sync();
assert!(next.map(|r| r.sync_id == "high").unwrap_or(false));
}
#[test]
fn test_model_version_manager_initial_version() {
let manager = ModelVersionManager::new();
assert_eq!(manager.current_version, "0.1.0");
assert!(manager.version_history.is_empty());
}
#[test]
fn test_model_version_manager_create_new_version() {
let mut manager = ModelVersionManager::new();
let result = manager.create_new_version(&[1u8, 2u8, 3u8]);
assert!(result.is_ok());
let new_version = result.unwrap_or_default();
assert!(!new_version.is_empty());
assert_eq!(manager.version_history.len(), 1);
}
#[test]
fn test_model_version_manager_version_increments() {
let mut manager = ModelVersionManager::new();
let v1 = manager.create_new_version(&[1u8]).unwrap_or_default();
assert_eq!(v1, "0.1.1");
let v2 = manager.create_new_version(&[2u8]).unwrap_or_default();
assert_eq!(v2, "0.1.2");
}
#[test]
fn test_integrity_checker_verify_valid_data() {
let mut checker = IntegrityChecker::new();
let result = checker.verify_integrity(&[1u8, 2u8, 3u8]);
assert!(result.is_ok());
assert!(result.unwrap_or(false));
}
#[test]
fn test_integrity_checker_reject_empty_data() {
let mut checker = IntegrityChecker::new();
let result = checker.verify_integrity(&[]);
assert!(result.is_ok());
assert!(!result.unwrap_or(true));
}
#[test]
fn test_integrity_checker_initial_failure_rate_zero() {
let checker = IntegrityChecker::new();
assert_eq!(checker.get_failure_rate(), 0.0);
}
#[test]
fn test_verification_cache_miss_returns_none() {
let mut cache = VerificationCache::new();
assert!(cache.get("nonexistent_key").is_none());
}
#[test]
fn test_verification_cache_insert_and_hit() {
let mut cache = VerificationCache::new();
cache.insert("my_hash".to_string(), true);
let result = cache.get("my_hash");
assert!(result.is_some());
assert!(result.unwrap_or(false));
}
#[test]
fn test_verification_cache_hit_rate_updates() {
let mut cache = VerificationCache::new();
cache.insert("key1".to_string(), true);
let _ = cache.get("key1");
assert!(cache.get_hit_rate() > 0.0);
}
#[test]
fn test_conflict_resolver_resolve_last_writer_wins() {
let mut resolver = ConflictResolver::new();
let conflict = ConflictMetadata {
conflict_id: "c1".to_string(),
source_version: "1.0.0".to_string(),
target_version: "1.0.1".to_string(),
conflict_type: "version_mismatch".to_string(),
timestamp: Instant::now(),
resolution_strategy: ConflictResolutionStrategy::LastWriterWins,
metadata: std::collections::HashMap::new(),
};
let result = resolver.resolve_conflict(&conflict);
assert!(result.is_ok());
let resolution = result.unwrap_or_else(|_| panic!("resolution failed"));
assert!(resolution.resolution_id.starts_with("lww_"));
assert_eq!(resolution.resolved_version, "1.0.1");
}
#[test]
fn test_conflict_resolver_history_grows() {
let mut resolver = ConflictResolver::new();
for i in 0..3 {
let conflict = ConflictMetadata {
conflict_id: format!("c{}", i),
source_version: "1.0.0".to_string(),
target_version: "1.0.1".to_string(),
conflict_type: "version_mismatch".to_string(),
timestamp: Instant::now(),
resolution_strategy: ConflictResolutionStrategy::LastWriterWins,
metadata: std::collections::HashMap::new(),
};
let _ = resolver.resolve_conflict(&conflict);
}
assert_eq!(resolver.conflict_history.len(), 3);
}
#[test]
fn test_sync_status_variants() {
assert!(matches!(SyncStatus::Pending, SyncStatus::Pending));
assert!(matches!(SyncStatus::InProgress, SyncStatus::InProgress));
assert!(matches!(SyncStatus::Completed, SyncStatus::Completed));
assert!(matches!(SyncStatus::Failed, SyncStatus::Failed));
assert!(matches!(SyncStatus::Cancelled, SyncStatus::Cancelled));
}
#[test]
fn test_update_config_ok() {
let config = NetworkAdaptationConfig::default();
let mut coordinator = ModelSyncCoordinator::new(config)
.unwrap_or_else(|_| panic!("coordinator creation failed"));
let new_config = NetworkAdaptationConfig::default();
assert!(coordinator.update_config(new_config).is_ok());
}
}