use std::sync::Arc;
use tokio::sync::Mutex;
use std::time::{Duration, Instant};
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::chain::ChainAssembler;
use crate::ocel::{build_event, object_ref, SeqCounter};
use crate::verifier::verify;
use crate::types::Blake3Hash;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
const NUM_PIPELINES: usize = 10_000;
const EVENTS_PER_PIPELINE: usize = 10;
println!("\x1b[1;35m--- GLOBAL SWARM E2E HARNESS ---\x1b[0m");
println!("Target Scale: {} parallel pipelines", NUM_PIPELINES);
println!("Load Depth: {} events per pipeline", EVENTS_PER_PIPELINE);
println!("Total Events: {}", NUM_PIPELINES * EVENTS_PER_PIPELINE);
println!("----------------------------------");
let swarm_start = Instant::now();
let completed = Arc::new(AtomicUsize::new(0));
let contention_registry = Arc::new(Mutex::new(Vec::new()));
let total_wait_micros = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::with_capacity(NUM_PIPELINES);
for i in 0..NUM_PIPELINES {
let completed = Arc::clone(&completed);
let contention_registry = Arc::clone(&contention_registry);
let total_wait_micros = Arc::clone(&total_wait_micros);
let handle = tokio::spawn(async move {
let mut counter = SeqCounter::new();
let mut asm = ChainAssembler::new();
for j in 0..EVENTS_PER_PIPELINE {
let event = build_event(
format!("swarm.op.{}", j),
vec![object_ref(format!("artifact.{}", i), "artifact")],
format!("payload.data.{}.{}", i, j).as_bytes(),
&mut counter,
).expect("EMIT failure");
asm.append(event).expect("ASSEMBLE failure");
}
let receipt = asm.finalize();
let final_hash = receipt.chain_hash.clone();
let verdict = verify(&receipt);
if !verdict.accepted {
panic!("VERIFY failure in pipeline {}: {}", i, verdict.reason);
}
let wait_start = Instant::now();
{
let mut registry = contention_registry.lock().await;
let wait_duration = wait_start.elapsed();
total_wait_micros.fetch_add(wait_duration.as_micros() as usize, Ordering::Relaxed);
registry.push(final_hash);
}
completed.fetch_add(1, Ordering::SeqCst);
receipt.chain_hash
});
handles.push(handle);
}
let mut swarm_hashes = Vec::with_capacity(NUM_PIPELINES);
for handle in handles {
swarm_hashes.push(handle.await?);
}
let duration = swarm_start.elapsed();
let total_done = completed.load(Ordering::SeqCst);
let avg_wait = total_wait_micros.load(Ordering::Relaxed) as f64 / total_done as f64;
println!("\x1b[1;32mSwarm Mission Accomplished.\x1b[0m");
println!("Wall Time: {:?}", duration);
println!("Throughput: {:.2} pipelines/sec", total_done as f64 / duration.as_secs_f64());
println!("Aggregate TP: {:.2} events/sec", (total_done * EVENTS_PER_PIPELINE) as f64 / duration.as_secs_f64());
println!("Lock Contention: {:.2} µs (avg wait)", avg_wait);
println!("\nValidating cross-thread determinism...");
let det_start = Instant::now();
let mut det_handles = Vec::new();
for _ in 0..100 {
det_handles.push(tokio::spawn(async move {
let mut counter = SeqCounter::new();
let mut asm = ChainAssembler::new();
let ev = build_event("const", vec![], b"fixed_payload", &mut counter).unwrap();
asm.append(ev).unwrap();
asm.finalize().chain_hash
}));
}
let base_hash = det_handles.remove(0).await?;
for (idx, h) in det_handles.into_iter().enumerate() {
let h_val = h.await?;
if h_val != base_hash {
println!("\x1b[1;31mDETERMINISM BREAK\x1b[0m at task {}: {} != {}", idx + 1, h_val, base_hash);
std::process::exit(1);
}
}
println!("Determinism Check: \x1b[1;32mPASSED\x1b[0m (100/100 coherent) in {:?}", det_start.elapsed());
println!("----------------------------------");
Ok(())
}