#![allow(dead_code)]
#[cfg(test)]
mod tests {
use crate::atp::actor::{TransferActorTopology, TransferChildRole, TransferRegionId};
use crate::atp::transfer::{
IdempotencyKey, PeerCapabilities, TransferActor, TransferActorId, TransferCancelPhase,
TransferCommand, TransferCommandKind, TransferFailureKind, TransferId, TransferManifestRef,
TransferObligationId, TransferProgress, TransferState,
};
use serde_json::json;
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
#[derive(Debug)]
struct TransferTestLogger {
suite_name: String,
test_name: String,
start_time: SystemTime,
current_phase: String,
transfer_snapshots: Vec<TransferSnapshot>,
}
#[derive(Debug, Clone)]
struct TransferSnapshot {
label: String,
transfer_state: String,
progress: TransferProgressSnapshot,
timestamp: SystemTime,
}
#[derive(Debug, Clone)]
struct TransferProgressSnapshot {
offered_bytes: u64,
verified_bytes: u64,
committed_bytes: u64,
repair_symbols: u64,
}
impl TransferTestLogger {
fn new(suite: &str, test: &str) -> Self {
let logger = Self {
suite_name: suite.to_string(),
test_name: test.to_string(),
start_time: SystemTime::now(),
current_phase: "init".to_string(),
transfer_snapshots: Vec::new(),
};
eprintln!(
"{}",
json!({
"ts": logger.start_time,
"suite": suite,
"test": test,
"event": "transfer_test_start"
})
);
logger
}
fn phase(&mut self, phase: &str) {
self.current_phase = phase.to_string();
eprintln!(
"{}",
json!({
"ts": SystemTime::now(),
"suite": self.suite_name,
"test": self.test_name,
"phase": phase,
"event": "transfer_phase_start"
})
);
}
fn transfer_snapshot(&mut self, label: &str, actor: &TransferActor) {
let snapshot = TransferSnapshot {
label: label.to_string(),
transfer_state: format!("{:?}", actor.state()),
progress: TransferProgressSnapshot {
offered_bytes: actor.progress.offered_bytes,
verified_bytes: actor.progress.verified_bytes,
committed_bytes: actor.progress.committed_bytes,
repair_symbols: actor.progress.repair_symbols,
},
timestamp: SystemTime::now(),
};
eprintln!(
"{}",
json!({
"ts": snapshot.timestamp,
"suite": self.suite_name,
"test": self.test_name,
"phase": self.current_phase,
"event": "transfer_snapshot",
"label": label,
"transfer_id": actor.transfer_id.to_hex(),
"state": snapshot.transfer_state,
"progress": {
"offered_bytes": snapshot.progress.offered_bytes,
"verified_bytes": snapshot.progress.verified_bytes,
"committed_bytes": snapshot.progress.committed_bytes,
"repair_symbols": snapshot.progress.repair_symbols,
"in_flight_bytes": snapshot.progress.offered_bytes.saturating_sub(snapshot.progress.verified_bytes)
}
})
);
self.transfer_snapshots.push(snapshot);
}
fn assert_transfer_state(
&self,
label: &str,
expected_state: TransferState,
actor: &TransferActor,
) -> bool {
let actual_state = actor.state();
let matches = expected_state == actual_state;
eprintln!(
"{}",
json!({
"ts": SystemTime::now(),
"suite": self.suite_name,
"test": self.test_name,
"phase": self.current_phase,
"event": "transfer_assertion",
"label": label,
"field": "transfer_state",
"expected": format!("{:?}", expected_state),
"actual": format!("{:?}", actual_state),
"match": matches
})
);
matches
}
fn test_end(&self, result: &str) {
let duration_ms = self
.start_time
.elapsed()
.unwrap_or(Duration::ZERO)
.as_millis() as u64;
eprintln!(
"{}",
json!({
"ts": SystemTime::now(),
"suite": self.suite_name,
"test": self.test_name,
"event": "transfer_test_end",
"result": result,
"duration_ms": duration_ms,
"total_snapshots": self.transfer_snapshots.len()
})
);
}
}
struct TransferScenarioFactory;
impl TransferScenarioFactory {
fn create_test_manifest(object_count: u32) -> TransferManifestRef {
TransferManifestRef {
schema_version: 1,
merkle_root: Self::generate_test_merkle_root(object_count),
object_count: u64::from(object_count),
}
}
fn generate_test_merkle_root(seed: u32) -> [u8; 32] {
let mut root = [0u8; 32];
for (i, byte) in root.iter_mut().enumerate() {
*byte = ((seed + i as u32) % 256) as u8;
}
root
}
fn create_test_topology(region_base: u32) -> TransferActorTopology {
let supervisor = TransferRegionId::new(u64::from(region_base));
let actor_region = TransferRegionId::new(u64::from(region_base + 1));
TransferActorTopology::new(supervisor, actor_region)
.with_child(
TransferRegionId::new(u64::from(region_base + 2)),
TransferChildRole::PathRace,
)
.with_child(
TransferRegionId::new(u64::from(region_base + 3)),
TransferChildRole::Writer,
)
.with_child(
TransferRegionId::new(u64::from(region_base + 4)),
TransferChildRole::Finalizer,
)
}
fn create_test_transfer_id(entropy: u64) -> TransferId {
let mut peer_id = [0u8; 32];
let mut nonce = [0u8; 32];
let mut manifest_hash = [0u8; 32];
let mut policy_hash = [0u8; 32];
for i in 0..32 {
peer_id[i] = ((entropy + i as u64) % 256) as u8;
nonce[i] = ((entropy * 2 + i as u64) % 256) as u8;
manifest_hash[i] = ((entropy * 3 + i as u64) % 256) as u8;
policy_hash[i] = ((entropy * 4 + i as u64) % 256) as u8;
}
TransferId::derive(peer_id, nonce, manifest_hash, policy_hash)
}
fn create_test_peer_capabilities() -> PeerCapabilities {
PeerCapabilities {
relay: true,
mailbox: true,
swarm: true,
max_inflight_obligations: 8,
}
}
fn create_transfer_actor(
actor_id: u32,
entropy: u64,
) -> Result<TransferActor, Box<dyn std::error::Error>> {
Ok(TransferActor::new(
TransferActorId::new(u64::from(actor_id)),
Self::create_test_transfer_id(entropy),
Self::create_test_manifest(5), Self::create_test_peer_capabilities(),
Self::create_test_topology(actor_id * 10),
)?)
}
fn command(key: u128, kind: TransferCommandKind) -> TransferCommand {
TransferCommand::new(IdempotencyKey::new(key), kind)
}
fn obligation(raw: u64) -> TransferObligationId {
TransferObligationId::new(raw)
}
}
struct TransferTestIsolation {
created_actors: Vec<TransferActorId>,
}
impl TransferTestIsolation {
fn new() -> Self {
Self {
created_actors: Vec::new(),
}
}
fn track_actor(&mut self, actor_id: TransferActorId) {
self.created_actors.push(actor_id);
}
}
impl Drop for TransferTestIsolation {
fn drop(&mut self) {
eprintln!(
"TransferTestIsolation: cleaned {} actors",
self.created_actors.len()
);
}
}
#[test]
fn transfer_actor_lifecycle_integration() {
let mut log = TransferTestLogger::new("transfer_integration", "actor_lifecycle");
let mut isolation = TransferTestIsolation::new();
log.phase("setup");
log.phase("actor_creation");
let mut actor = TransferScenarioFactory::create_transfer_actor(100, 0x1234567890abcdef)
.expect("create transfer actor");
isolation.track_actor(actor.actor_id);
log.transfer_snapshot("initial_actor_state", &actor);
assert!(log.assert_transfer_state("initial_state", TransferState::Offered, &actor));
log.phase("transfer_start");
actor
.apply(TransferScenarioFactory::command(
1,
TransferCommandKind::Accept {
obligation: TransferScenarioFactory::obligation(1),
},
))
.expect("accept transfer");
let start_result = actor.apply(TransferScenarioFactory::command(
2,
TransferCommandKind::Start {
path_id: 1,
obligation: TransferScenarioFactory::obligation(2),
},
));
log.transfer_snapshot("post_start_state", &actor);
match start_result {
Ok(_) => {
assert!(log.assert_transfer_state("started_state", TransferState::Running, &actor));
}
Err(e) => {
eprintln!("Transfer start failed: {:?}", e);
panic!("Transfer start should succeed");
}
}
log.phase("progress_simulation");
actor.progress.offered_bytes = 10_240; actor.progress.verified_bytes = 5_120; actor.progress.committed_bytes = 2_048; actor.progress.repair_symbols = 3;
log.transfer_snapshot("progress_updated", &actor);
let in_flight = actor.progress.offered_bytes - actor.progress.verified_bytes;
let pending_commit = actor.progress.verified_bytes - actor.progress.committed_bytes;
assert_eq!(in_flight, 5_120);
assert_eq!(pending_commit, 3_072);
log.phase("pressure_monitoring");
let pressure_snapshot = actor.pressure_snapshot("transfer_lifecycle_test", 1);
eprintln!(
"{}",
json!({
"ts": SystemTime::now(),
"suite": log.suite_name,
"test": log.test_name,
"phase": log.current_phase,
"event": "pressure_snapshot",
"transfer_id": pressure_snapshot.transfer_id,
"in_flight_bytes": pressure_snapshot.in_flight_bytes,
"receive_buffer_queued_bytes": pressure_snapshot.receive_buffer_queued_bytes
})
);
assert!(pressure_snapshot.in_flight_bytes.unwrap_or(0) > 0);
log.phase("transfer_completion");
let commit_result = actor.apply(TransferScenarioFactory::command(
3,
TransferCommandKind::Commit {
obligation: TransferScenarioFactory::obligation(3),
},
));
log.transfer_snapshot("post_commit_state", &actor);
match commit_result {
Ok(_) => {
assert!(log.assert_transfer_state(
"completed_state",
TransferState::Committed,
&actor
));
}
Err(e) => {
eprintln!("Transfer commit failed: {:?}", e);
}
}
log.phase("teardown");
log.test_end("pass");
}
#[test]
fn multi_peer_transfer_coordination_integration() {
let mut log = TransferTestLogger::new("transfer_integration", "multi_peer_coordination");
let mut isolation = TransferTestIsolation::new();
log.phase("setup");
log.phase("multi_actor_creation");
let mut actors = Vec::new();
for i in 0..3 {
let actor = TransferScenarioFactory::create_transfer_actor(
200 + i,
0x1111222233334444 + i as u64 * 0x1000000000000000,
)
.expect("create transfer actor");
isolation.track_actor(actor.actor_id);
log.transfer_snapshot(&format!("initial_actor_{}", i), &actor);
actors.push(actor);
}
log.phase("coordination_setup");
for (i, actor) in actors.iter_mut().enumerate() {
actor
.apply(TransferScenarioFactory::command(
100 + i as u128,
TransferCommandKind::Accept {
obligation: TransferScenarioFactory::obligation(100 + i as u64),
},
))
.expect("accept transfer");
let start_result = actor.apply(TransferScenarioFactory::command(
200 + i as u128,
TransferCommandKind::Start {
path_id: i as u64 + 1,
obligation: TransferScenarioFactory::obligation(200 + i as u64),
},
));
log.transfer_snapshot(&format!("started_actor_{}", i), actor);
if let Err(e) = start_result {
eprintln!("Actor {} start failed: {:?}", i, e);
}
}
log.phase("progress_coordination");
for (i, actor) in actors.iter_mut().enumerate() {
let base_progress = (i + 1) as u64 * 1024;
actor.progress.offered_bytes = base_progress * 8;
actor.progress.verified_bytes = base_progress * 6;
actor.progress.committed_bytes = base_progress * 4;
actor.progress.repair_symbols = i as u64 * 2;
log.transfer_snapshot(&format!("coordinated_progress_actor_{}", i), actor);
}
log.phase("verification");
let mut transfer_ids = std::collections::HashSet::new();
let mut region_ids = std::collections::HashSet::new();
for (i, actor) in actors.iter().enumerate() {
assert!(
transfer_ids.insert(actor.transfer_id.to_hex()),
"Transfer ID should be unique for actor {}",
i
);
assert!(
region_ids.insert(actor.topology.actor_region),
"Actor region should be unique for actor {}",
i
);
assert!(
actor.progress.verified_bytes <= actor.progress.offered_bytes,
"Verified bytes should not exceed offered bytes for actor {}",
i
);
assert!(
actor.progress.committed_bytes <= actor.progress.verified_bytes,
"Committed bytes should not exceed verified bytes for actor {}",
i
);
}
log.phase("teardown");
log.test_end("pass");
}
#[test]
fn transfer_cancellation_and_failure_handling_integration() {
let mut log =
TransferTestLogger::new("transfer_integration", "cancellation_failure_handling");
let mut isolation = TransferTestIsolation::new();
log.phase("setup");
log.phase("actor_setup_for_cancellation");
let mut actor = TransferScenarioFactory::create_transfer_actor(300, 0xabcdef1234567890)
.expect("create transfer actor");
isolation.track_actor(actor.actor_id);
log.transfer_snapshot("pre_cancel_initial", &actor);
actor
.apply(TransferScenarioFactory::command(
1,
TransferCommandKind::Accept {
obligation: TransferScenarioFactory::obligation(1),
},
))
.expect("accept before cancellation");
let _ = actor.apply(TransferScenarioFactory::command(
2,
TransferCommandKind::Start {
path_id: 1,
obligation: TransferScenarioFactory::obligation(2),
},
));
log.transfer_snapshot("pre_cancel_started", &actor);
log.phase("cancellation_workflow");
let cancel_result = actor.apply(TransferScenarioFactory::command(
3,
TransferCommandKind::Cancel {
phase: TransferCancelPhase::Requested,
},
));
log.transfer_snapshot("post_cancel_request", &actor);
match cancel_result {
Ok(_) => {
assert!(log.assert_transfer_state(
"cancelled_state",
TransferState::Cancelling,
&actor
));
}
Err(e) => {
eprintln!("Transfer cancellation failed: {:?}", e);
}
}
log.phase("failure_simulation");
let mut failure_actor =
TransferScenarioFactory::create_transfer_actor(301, 0xfedcba0987654321)
.expect("create failure test actor");
isolation.track_actor(failure_actor.actor_id);
log.transfer_snapshot("failure_actor_initial", &failure_actor);
let fail_result = failure_actor.apply(TransferScenarioFactory::command(
4,
TransferCommandKind::Fail {
kind: TransferFailureKind::Peer,
},
));
log.transfer_snapshot("post_failure", &failure_actor);
match fail_result {
Ok(_) => {
assert!(log.assert_transfer_state(
"failed_state",
TransferState::Failed,
&failure_actor
));
}
Err(e) => {
eprintln!("Transfer failure handling failed: {:?}", e);
}
}
log.phase("teardown");
log.test_end("pass");
}
}