use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use ed25519_dalek::SigningKey;
use super::config::{self, InstanceState};
use super::provision;
use crate::miner::collect;
pub struct DispatchSummary {
pub cycles: u64,
pub total_synced: usize,
pub total_submitted: usize,
pub total_already: usize,
pub total_failed: usize,
}
pub fn run(
ssh_key: &SigningKey,
server_url: &str,
poll_secs: u64,
verbose: bool,
) -> anyhow::Result<DispatchSummary> {
let stop = Arc::new(AtomicBool::new(false));
let stop2 = stop.clone();
let _ = signal_hook::flag::register(signal_hook::consts::SIGINT, stop2);
let interval = Duration::from_secs(poll_secs);
let mut summary = DispatchSummary {
cycles: 0,
total_synced: 0,
total_submitted: 0,
total_already: 0,
total_failed: 0,
};
println!("Solution dispatcher started (poll every {poll_secs}s, 4 submission threads)");
println!("Press Ctrl+C to stop.\n");
loop {
if stop.load(Ordering::Relaxed) {
println!("\nDispatcher interrupted.");
break;
}
let instances = config::load_instances().unwrap_or_default();
if instances.is_empty() {
println!("No active instances — dispatcher exiting.");
break;
}
summary.cycles += 1;
let cycle_start = Instant::now();
let mut cycle_synced = 0usize;
for inst in &instances {
let synced = sync_instance(ssh_key, inst);
cycle_synced += synced;
}
summary.total_synced += cycle_synced;
let cr = collect::run(server_url, verbose)?;
let s = cr.submitted;
let a = cr.already_accepted;
let f = cr.failed;
summary.total_submitted += s;
summary.total_already += a;
summary.total_failed += f;
let elapsed = cycle_start.elapsed();
let inst_count = instances.len();
if cycle_synced > 0 || s > 0 || f > 0 {
println!(
"[cycle {}] {} instance(s) | synced: {} | submitted: {} | already: {} | failed: {} | {:.1}s",
summary.cycles, inst_count, cycle_synced, s, a, f, elapsed.as_secs_f64()
);
} else {
print!(
"\r[cycle {}] {} instance(s) | no new solutions | {:.1}s ",
summary.cycles,
inst_count,
elapsed.as_secs_f64()
);
use std::io::Write;
let _ = std::io::stdout().flush();
}
let deadline = Instant::now() + interval;
while Instant::now() < deadline {
if stop.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(Duration::from_millis(500));
}
}
println!();
println!("Dispatch summary:");
println!(" Cycles: {}", summary.cycles);
println!(" Synced: {}", summary.total_synced);
println!(" Submitted: {}", summary.total_submitted);
println!(" Already: {}", summary.total_already);
println!(" Failed: {}", summary.total_failed);
Ok(summary)
}
fn sync_instance(ssh_key: &SigningKey, inst: &InstanceState) -> usize {
provision::append_remote_logs(ssh_key, &inst.ssh_host, inst.ssh_port)
}