use std::{
collections::BTreeMap,
path::PathBuf,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use clap::Parser;
use console::style;
use dashmap::DashMap;
use eyre::{Context, Result};
use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle};
use simulator_api::{BacktestResponse, BacktestStatus};
use simulator_client::{
BacktestClient, BacktestClientError, Continue, CreateSession, ReadyOutcome, SubscriptionHandle,
split_range,
};
use solana_address::Address;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use crate::{
output::{AccountDiffRow, SimulationMetadata, SimulationOutput, Transaction},
signals::spawn_ctrlc_cursor_fix,
subscription::{set_up_account_diff_sub, set_up_log_sub},
};
const MAX_PARALLEL_SESSIONS: usize = 10;
const CREATION_TIMEOUT_SECS: u64 = 900;
const WAITING_FOR_NEXT_CONTINUE_AFTER_SECS: u64 = 10;
#[derive(Clone, Debug, PartialEq, Eq, clap::ValueEnum)]
pub enum SubscriptionType {
Logs,
AccountDiff,
}
#[derive(Clone)]
struct OutputSink {
records: Arc<DashMap<String, Transaction>>,
stream_tx: Option<tokio::sync::mpsc::UnboundedSender<AccountDiffRow>>,
}
#[derive(Clone, Copy)]
struct SlotRange {
start: u64,
end: u64,
}
#[derive(Parser, Debug, Clone)]
pub struct RunArgs {
#[arg(
long,
env = "SIMULATOR_URL",
default_value = "simulator.termina.technology"
)]
pub url: String,
#[arg(long, env = "SIMULATOR_API_KEY")]
pub api_key: String,
#[arg(long, env = "SIMULATOR_START_SLOT")]
pub start_slot: u64,
#[arg(long, env = "SIMULATOR_END_SLOT")]
pub end_slot: u64,
#[arg(long, env = "SIMULATOR_ADVANCE_COUNT")]
pub advance_count: Option<u64>,
#[arg(long, env = "SIMULATOR_PARALLEL", default_value_t = false)]
pub parallel: bool,
#[arg(long, env = "SIMULATOR_PROGRAM_ID")]
pub program_id: Vec<String>,
#[arg(long, env = "SIMULATOR_PROGRAM_SO")]
pub program_so: Vec<PathBuf>,
#[arg(long, env = "SIMULATOR_OUTPUT_FILE")]
pub output_file: Option<PathBuf>,
#[arg(long, env = "SIMULATOR_PRELOAD_PROGRAMS", value_delimiter = ',')]
pub preload_programs: Vec<Address>,
#[arg(long, env = "SIMULATOR_PRELOAD_BUNDLES", value_delimiter = ',')]
pub preload_bundles: Vec<String>,
#[arg(long, env = "SIMULATOR_WAIT_FOR_PRELOAD", default_value_t = false)]
pub wait_for_preload: bool,
#[arg(long, env = "SIMULATOR_EXTRA_COMPUTE_UNITS")]
pub extra_compute_units: Option<u32>,
#[arg(long, env = "SIMULATOR_SUBSCRIPTION", requires = "program_id")]
pub subscription: Option<SubscriptionType>,
#[arg(long, env = "SIMULATOR_STREAM_FILE", requires = "subscription")]
pub stream_file: Option<PathBuf>,
#[arg(long, default_value_t = false)]
pub verbose: bool,
#[arg(long, env = "SIMULATOR_DEBUG_PLACEMENT_WAIT", default_value_t = false)]
pub debug_placement_wait: bool,
}
fn validate_args(args: &RunArgs) -> Result<()> {
if !args.program_so.is_empty() {
eyre::ensure!(
args.program_so.len() == args.program_id.len(),
"--program-so count ({}) must match --program-id count ({})",
args.program_so.len(),
args.program_id.len(),
);
}
Ok(())
}
pub async fn run(args: RunArgs, cancellation: CancellationToken) -> Result<()> {
validate_args(&args)?;
let output_file = args.output_file.clone().unwrap_or_else(|| {
let suffix = if !args.program_so.is_empty() {
"override"
} else {
"baseline"
};
PathBuf::from(format!(
"{}_{}_{}.json",
args.start_slot, args.end_slot, suffix
))
});
let base_url = if args.url.starts_with("ws://") || args.url.starts_with("wss://") {
format!("{}/backtest", args.url.trim_end_matches('/'))
} else {
format!("wss://{}/backtest", args.url)
};
let client = BacktestClient::builder()
.url(base_url)
.api_key(args.api_key.clone())
.build();
let (stream_tx, stream_writer_handle) = if let Some(ref stream_path) = args.stream_file {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<AccountDiffRow>();
let handle = spawn_stream_writer(stream_path.clone(), rx);
(Some(tx), Some(handle))
} else {
(None, None)
};
let output = OutputSink {
records: Arc::new(DashMap::new()),
stream_tx,
};
let session_ids: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let args = Arc::new(args);
spawn_ctrlc_cursor_fix();
if args.verbose {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
}
if args.parallel {
let progress = MultiProgress::new();
let ranges = client
.available_ranges()
.await
.map_err(|e| eyre::eyre!("failed to fetch available ranges: {e}"))?;
let sub_ranges = split_range(&ranges, args.start_slot, args.end_slot)
.map_err(|e| eyre::eyre!("cannot split range: {e}"))?;
info!("splitting into {} parallel session(s)", sub_ranges.len());
let _ = progress.println(format!(
"{} Splitting into {} parallel session(s)",
style(">").cyan(),
sub_ranges.len(),
));
let bars: Vec<ProgressBar> = sub_ranges
.iter()
.map(|&(start, end)| {
if args.verbose {
return ProgressBar::hidden();
}
let bar = ProgressBar::new(end.saturating_sub(start) + 1);
bar.set_style(spinner_style(start, end));
let bar = progress.add(bar);
bar.enable_steady_tick(Duration::from_millis(100));
bar
})
.collect();
let semaphore = Arc::new(Semaphore::new(MAX_PARALLEL_SESSIONS));
let mut join_set = tokio::task::JoinSet::new();
for ((start, end), bar) in sub_ranges.into_iter().zip(bars) {
let client = client.clone();
let args = args.clone();
let output = output.clone();
let session_ids = session_ids.clone();
let cancellation = cancellation.clone();
let semaphore = semaphore.clone();
join_set.spawn(async move {
let _permit = semaphore.acquire_owned().await?;
let request_started = Instant::now();
let create = CreateSession::builder()
.start_slot(start)
.end_slot(end)
.preload_programs(args.preload_programs.iter().copied().collect())
.preload_account_bundles(args.preload_bundles.clone())
.disconnect_timeout_secs(180)
.capacity_wait_timeout_secs(CREATION_TIMEOUT_SECS as u16)
.maybe_extra_compute_units(args.extra_compute_units)
.build();
let session = client
.create_session(create)
.await
.map_err(|e| eyre::eyre!("failed to create session {start}-{end}: {e}"))?;
let session_id = session.session_id().unwrap().to_string();
session_ids.lock().unwrap().push(session_id.clone());
info!(session_id, start, end, "session created");
drive_session(
session,
&client,
&args,
&bar,
SlotRange { start, end },
request_started,
&output,
&cancellation,
)
.await
.inspect_err(|e| bar.abandon_with_message(format!("failed: {e}")))
});
}
let mut first_error: Option<eyre::Report> = None;
while let Some(result) = join_set.join_next().await {
let result = result.map_err(|e| eyre::eyre!("session task panicked: {e}"))?;
if let Err(e) = result
&& first_error.is_none()
{
first_error = Some(e);
}
}
if let Some(e) = first_error {
return Err(e);
}
} else {
let create = CreateSession::builder()
.start_slot(args.start_slot)
.end_slot(args.end_slot)
.preload_programs(args.preload_programs.iter().copied().collect())
.preload_account_bundles(args.preload_bundles.clone())
.disconnect_timeout_secs(180)
.capacity_wait_timeout_secs(CREATION_TIMEOUT_SECS as u16)
.maybe_extra_compute_units(args.extra_compute_units)
.build();
let request_started = Instant::now();
let session = client
.create_session(create)
.await
.map_err(|e| eyre::eyre!("failed to create session: {e}"))?;
let session_id = session.session_id().unwrap().to_string();
session_ids.lock().unwrap().push(session_id.clone());
info!(
session_id,
start = args.start_slot,
end = args.end_slot,
"session created"
);
let range = SlotRange {
start: args.start_slot,
end: args.end_slot,
};
let bar = if args.verbose {
ProgressBar::hidden()
} else {
let bar = ProgressBar::new(range.end.saturating_sub(range.start) + 1);
bar.set_style(spinner_style(range.start, range.end));
bar.set_message("starting session...");
bar.enable_steady_tick(Duration::from_millis(100));
bar
};
drive_session(
session,
&client,
&args,
&bar,
range,
request_started,
&output,
&cancellation,
)
.await
.inspect_err(|e| bar.abandon_with_message(format!("failed: {e}")))?;
}
let OutputSink { records, stream_tx } = output;
drop(stream_tx);
if let Some(handle) = stream_writer_handle {
handle
.await
.context("stream writer task panicked")?
.context("stream writer failed")?;
}
let session_ids = session_ids.lock().unwrap().clone();
let records_map = Arc::try_unwrap(records).unwrap_or_else(|arc| (*arc).clone());
let mut records: Vec<Transaction> = records_map.into_iter().map(|(_, tx)| tx).collect();
records.sort_by(|a, b| a.slot.cmp(&b.slot).then(a.signature.cmp(&b.signature)));
let metadata = SimulationMetadata {
start_slot: args.start_slot,
end_slot: args.end_slot,
program_ids: args.program_id.clone(),
program_so: args
.program_so
.iter()
.map(|p| p.display().to_string())
.collect(),
ran_at_unix_secs: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
session_ids,
};
let output = SimulationOutput::build(metadata, records);
let json =
serde_json::to_string_pretty(&output).context("failed to serialize output to JSON")?;
std::fs::write(&output_file, &json)
.with_context(|| format!("failed to write {}", output_file.display()))?;
println!("\n{} Simulation complete", style("✔").green());
println!(" Transactions : {}", output.summary.total_transactions);
println!(" Successes : {}", output.summary.successes);
println!(" Failures : {}", output.summary.failures);
println!(" Output file : {}", output_file.display());
Ok(())
}
const MAX_RECONNECTS: u32 = 3;
fn is_connection_error(e: &BacktestClientError) -> bool {
matches!(
e,
BacktestClientError::Closed { .. }
| BacktestClientError::WebSocket { .. }
| BacktestClientError::Timeout { .. }
)
}
#[allow(clippy::too_many_arguments)]
async fn drive_session(
mut session: simulator_client::BacktestSession,
client: &BacktestClient,
args: &RunArgs,
pb: &ProgressBar,
range: SlotRange,
request_started: Instant,
output: &OutputSink,
cancellation: &CancellationToken,
) -> Result<()> {
let SlotRange { start, end } = range;
let advance_count = args.advance_count.unwrap_or(end.saturating_sub(start) + 1);
let session_id = session.session_id().unwrap().to_string();
let mut reconnect_attempts = 0u32;
let placement_wait = request_started.elapsed();
let ready_outcome = tokio::select! {
r = wait_until_ready(
&mut session,
pb,
start,
Some(Duration::from_secs(CREATION_TIMEOUT_SECS)),
"starting session",
None,
false,
) => {
match r {
Ok(outcome) => {
pb.set_message(progress_message(
"",
Some(placement_wait),
args.debug_placement_wait,
));
Some(outcome)
},
Err(BacktestClientError::Timeout { .. }) => {
tracing::warn!(start, end, "no capacity after {CREATION_TIMEOUT_SECS}s, skipping");
pb.abandon_with_message(format!(
"no capacity after {CREATION_TIMEOUT_SECS}s, skipped"
));
session.close(Some(Duration::from_secs(10))).await.ok();
return Ok(());
},
Err(BacktestClientError::Remote(err)) => {
pb.abandon_with_message(format!("server error: {err}"));
session.close(Some(Duration::from_secs(10))).await.ok();
return Ok(());
},
Err(e) => {
return Err(eyre::Report::new(e));
}
}
}
_ = cancellation.cancelled() => None,
};
match ready_outcome {
None => {
info!(start, end, "cancelled before ready");
pb.abandon_with_message("cancelled");
session.close(Some(Duration::from_secs(10))).await.ok();
return Ok(());
}
Some(ReadyOutcome::Completed) => {
info!(start, end, "done (empty range)");
pb.finish_with_message("done (empty range)");
return Ok(());
}
Some(ReadyOutcome::Ready) => {
info!(start, end, "session ready, starting");
}
}
if args.wait_for_preload && !args.preload_programs.is_empty() {
info!(start, end, "loading program accounts");
pb.set_message(progress_message(
"loading program accounts",
Some(placement_wait),
args.debug_placement_wait,
));
session
.wait_for_status(BacktestStatus::ProgramAccountsLoaded, None)
.await?;
}
pb.set_style(bar_style(start, end));
pb.reset_elapsed();
pb.set_position(0);
pb.set_message(progress_message(
"",
Some(placement_wait),
args.debug_placement_wait,
));
let mut modifications = BTreeMap::new();
for (id, path) in args.program_id.iter().zip(&args.program_so) {
let elf =
std::fs::read(path).with_context(|| format!("failed to read {}", path.display()))?;
let mods = session
.modify_program(id, &elf)
.await
.map_err(|e| eyre::eyre!("failed to build program injection for {id}: {e}"))?;
modifications.extend(mods);
}
let mut sub_tasks = ActiveSubscriptions::start(&mut session, args, output).await?;
let mut pending_mods = Some(modifications);
loop {
if cancellation.is_cancelled() {
break;
}
if !session.is_ready_for_continue() {
let ready_outcome = tokio::select! {
r = wait_until_ready(
&mut session,
pb,
start,
None,
"waiting for next continue",
Some(placement_wait),
args.debug_placement_wait,
) => Some(r),
_ = cancellation.cancelled() => None,
};
match ready_outcome {
None => break,
Some(Ok(ReadyOutcome::Completed)) => break,
Some(Ok(ReadyOutcome::Ready)) => {}
Some(Err(e)) if is_connection_error(&e) && reconnect_attempts < MAX_RECONNECTS => {
reconnect_attempts += 1;
warn!(start, end, error = %e, attempt = reconnect_attempts, "connection lost while waiting, reattaching");
tokio::time::sleep(Duration::from_secs(2)).await;
reattach_session_with_subscriptions(
&mut session,
client,
&session_id,
args,
output,
&mut sub_tasks,
cancellation,
)
.await?;
continue;
}
Some(Err(e)) => return Err(eyre::Report::new(e)),
}
}
let mods = pending_mods.take().unwrap_or_default();
let cont = Continue::builder()
.advance_count(advance_count)
.modify_accounts(mods)
.build();
info!(
start,
end,
advance_count = advance_count,
"sending continue"
);
let outcome = tokio::select! {
r = session.continue_until_ready(cont, Some(Duration::new(30, 0)), |event| {
if let simulator_api::BacktestResponse::SlotNotification(slot) = event {
tracing::debug!(slot, "slot notification: {slot}");
update_progress_position(pb, start, *slot);
}
}) => Some(r),
_ = cancellation.cancelled() => None,
};
let Some(result) = outcome else { break };
match result {
Ok(r) => {
info!(start, end, completed = r.completed, "continue returned");
reconnect_attempts = 0;
if r.completed {
break;
}
}
Err(e) if is_connection_error(&e) && reconnect_attempts < MAX_RECONNECTS => {
reconnect_attempts += 1;
warn!(start, end, error = %e, attempt = reconnect_attempts, "connection lost during continue, reattaching");
tokio::time::sleep(Duration::from_secs(2)).await;
reattach_session_with_subscriptions(
&mut session,
client,
&session_id,
args,
output,
&mut sub_tasks,
cancellation,
)
.await?;
}
Err(e) => return Err(eyre::Report::new(e)),
}
}
if cancellation.is_cancelled() {
info!(start, end, "cancelled");
pb.abandon_with_message(progress_message(
"cancelled",
Some(placement_wait),
args.debug_placement_wait,
));
} else {
info!(start, end, "done");
pb.finish_with_message(progress_message(
"done",
Some(placement_wait),
args.debug_placement_wait,
));
}
sub_tasks.stop(cancellation).await?;
session.close(Some(Duration::from_secs(10))).await.ok();
Ok(())
}
struct ActiveSubscriptions(Vec<SubscriptionHandle>);
impl ActiveSubscriptions {
async fn start(
session: &mut simulator_client::BacktestSession,
args: &RunArgs,
output: &OutputSink,
) -> Result<Self> {
let mut handles = Vec::new();
for program_id in &args.program_id {
match args.subscription {
Some(SubscriptionType::AccountDiff) => {
handles.extend(
set_up_account_diff_sub(
session,
program_id,
output.records.clone(),
output.stream_tx.clone(),
)
.await?,
);
}
Some(SubscriptionType::Logs) | None => {
handles
.push(set_up_log_sub(session, program_id, output.records.clone()).await?);
}
}
}
Ok(Self(handles))
}
async fn stop(&mut self, cancellation: &CancellationToken) -> Result<()> {
for handle in self.0.drain(..) {
handle.stop.send(true).ok();
let mut join = handle.join_handle;
tokio::select! {
result = &mut join => {
match result {
Ok(Ok(())) => {}
Ok(Err(error)) => {
return Err(eyre::eyre!("subscription task failed: {error}"));
}
Err(error) => {
return Err(eyre::eyre!("subscription task panicked: {error}"));
}
}
}
_ = tokio::time::sleep(Duration::from_secs(30)) => {
join.abort();
return Err(eyre::eyre!("timed out waiting for subscription task to drain"));
}
_ = cancellation.cancelled() => {
join.abort();
}
}
}
Ok(())
}
}
async fn rebuild_subscriptions_for_reattach(
reattached_session: &mut simulator_client::BacktestSession,
args: &RunArgs,
output: &OutputSink,
sub_tasks: &mut ActiveSubscriptions,
cancellation: &CancellationToken,
) -> Result<ActiveSubscriptions> {
sub_tasks.stop(cancellation).await?;
ActiveSubscriptions::start(reattached_session, args, output).await
}
async fn resume_reattached_session(
reattached_session: &mut simulator_client::BacktestSession,
session_id: &str,
) -> Result<()> {
reattached_session
.resume_attached_session()
.await
.map_err(|e| eyre::eyre!("failed to resume reattached session {session_id}: {e}"))
}
#[allow(clippy::too_many_arguments)]
async fn reattach_session_with_subscriptions(
session: &mut simulator_client::BacktestSession,
client: &BacktestClient,
session_id: &str,
args: &RunArgs,
output: &OutputSink,
sub_tasks: &mut ActiveSubscriptions,
cancellation: &CancellationToken,
) -> Result<()> {
let last_sequence = session.last_sequence();
let mut reattached_session = client
.attach_session(session_id, last_sequence)
.await
.map_err(|e| eyre::eyre!("failed to reattach to session {session_id}: {e}"))?;
let rebuilt_sub_tasks = rebuild_subscriptions_for_reattach(
&mut reattached_session,
args,
output,
sub_tasks,
cancellation,
)
.await?;
resume_reattached_session(&mut reattached_session, session_id).await?;
*sub_tasks = rebuilt_sub_tasks;
*session = reattached_session;
Ok(())
}
async fn wait_until_ready(
session: &mut simulator_client::BacktestSession,
pb: &ProgressBar,
start_slot: u64,
timeout: Option<Duration>,
phase: &str,
placement_wait: Option<Duration>,
show_placement_wait: bool,
) -> Result<ReadyOutcome, BacktestClientError> {
let status_wait_timeout = Duration::from_secs(WAITING_FOR_NEXT_CONTINUE_AFTER_SECS);
let wait_deadline = timeout.map(|timeout| Instant::now() + timeout);
let mut last_slot_notification_at = None;
let mut latest_status = None;
if session.is_ready_for_continue() {
pb.set_message(progress_message(
&format!("{phase}: ready"),
placement_wait,
show_placement_wait,
));
return Ok(ReadyOutcome::Ready);
}
loop {
let next_timeout = next_wait_timeout(wait_deadline, status_wait_timeout, phase)?;
let response = match session.next_event(Some(next_timeout)).await {
Ok(Some(response)) => response,
Ok(None) => {
return Err(BacktestClientError::Closed {
reason: format!("websocket ended while {phase}"),
});
}
Err(BacktestClientError::Timeout { .. }) => {
if should_show_waiting_message(last_slot_notification_at, status_wait_timeout) {
pb.set_message(progress_message(
&waiting_message(phase, latest_status.as_ref()),
placement_wait,
show_placement_wait,
));
}
continue;
}
Err(error) => return Err(error),
};
match response {
BacktestResponse::ReadyForContinue => {
pb.set_message(progress_message(
&format!("{phase}: ready"),
placement_wait,
show_placement_wait,
));
return Ok(ReadyOutcome::Ready);
}
BacktestResponse::Completed { .. } => {
pb.set_message(progress_message(
&format!("{phase}: completed"),
placement_wait,
show_placement_wait,
));
return Ok(ReadyOutcome::Completed);
}
BacktestResponse::Error(err) => {
pb.set_message(progress_message(
&format!("{phase}: server error"),
placement_wait,
show_placement_wait,
));
return Err(BacktestClientError::Remote(err));
}
BacktestResponse::Status { status } => {
latest_status = Some(status);
if should_show_waiting_message(last_slot_notification_at, status_wait_timeout) {
pb.set_message(progress_message(
&waiting_message(phase, latest_status.as_ref()),
placement_wait,
show_placement_wait,
));
}
}
BacktestResponse::SlotNotification(slot) => {
update_progress_position(pb, start_slot, slot);
last_slot_notification_at = Some(Instant::now());
pb.set_message(progress_message("", placement_wait, show_placement_wait));
}
BacktestResponse::SessionCreated { session_id, .. } => {
pb.set_message(progress_message(
&format!("{phase}: created {session_id}"),
placement_wait,
show_placement_wait,
));
}
BacktestResponse::SessionAttached {
session_id,
rpc_endpoint,
} => {
pb.set_message(progress_message(
&format!("{phase}: attached {session_id} at {rpc_endpoint}"),
placement_wait,
show_placement_wait,
));
}
BacktestResponse::SessionsCreated { session_ids } => {
pb.set_message(progress_message(
&format!("{phase}: created {} session(s)", session_ids.len()),
placement_wait,
show_placement_wait,
));
}
_ => {}
}
}
}
fn next_wait_timeout(
wait_deadline: Option<Instant>,
status_wait_timeout: Duration,
_phase: &str,
) -> Result<Duration, BacktestClientError> {
match wait_deadline {
Some(deadline) => {
let now = Instant::now();
if now >= deadline {
return Err(BacktestClientError::Timeout {
action: "waiting",
duration: status_wait_timeout,
});
}
Ok((deadline - now).min(status_wait_timeout))
}
None => Ok(status_wait_timeout),
}
}
fn should_show_waiting_message(
last_slot_notification_at: Option<Instant>,
status_wait_timeout: Duration,
) -> bool {
match last_slot_notification_at {
None => true,
Some(last_slot_notification_at) => {
last_slot_notification_at.elapsed() >= status_wait_timeout
}
}
}
fn waiting_message(phase: &str, latest_status: Option<&BacktestStatus>) -> String {
match latest_status {
Some(status) => format!("{phase}: {}", backtest_status_label(status)),
None => phase.to_string(),
}
}
fn backtest_status_label(status: &BacktestStatus) -> &'static str {
match status {
BacktestStatus::StartingRuntime => "starting runtime",
BacktestStatus::DecodedTransactions => "decoded transactions",
BacktestStatus::AppliedAccountModifications => "applied account modifications",
BacktestStatus::ReadyToExecuteUserTransactions => "ready to execute user transactions",
BacktestStatus::ExecutedUserTransactions => "executed user transactions",
BacktestStatus::ExecutingBlockTransactions => "executing block transactions",
BacktestStatus::ExecutedBlockTransactions => "executed block transactions",
BacktestStatus::ProgramAccountsLoaded => "program accounts loaded",
}
}
fn spawn_stream_writer(
path: PathBuf,
mut rx: tokio::sync::mpsc::UnboundedReceiver<AccountDiffRow>,
) -> tokio::task::JoinHandle<Result<()>> {
tokio::spawn(async move {
use std::io::Write;
let write_err = || format!("could not write to '{}'", path.display());
let file = std::fs::File::create(&path).with_context(|| {
format!(
"could not create '{}' — check that the directory exists and is writable",
path.display()
)
})?;
let mut writer = std::io::BufWriter::new(file);
while let Some(row) = rx.recv().await {
serde_json::to_writer(&mut writer, &row).with_context(write_err)?;
writer.write_all(b"\n").with_context(write_err)?;
while let Ok(row) = rx.try_recv() {
serde_json::to_writer(&mut writer, &row).with_context(write_err)?;
writer.write_all(b"\n").with_context(write_err)?;
}
writer.flush().with_context(write_err)?;
}
writer
.flush()
.with_context(|| format!("could not finalize '{}'", path.display()))?;
Ok(())
})
}
fn update_progress_position(pb: &ProgressBar, start_slot: u64, slot: u64) {
pb.set_position(slot.saturating_sub(start_slot).saturating_add(1));
}
fn spinner_style(start: u64, end: u64) -> ProgressStyle {
ProgressStyle::with_template("[{range}] {spinner:.cyan} {msg}")
.unwrap_or_else(|_| ProgressStyle::default_spinner())
.with_key(
"range",
move |_: &ProgressState, f: &mut dyn std::fmt::Write| {
let _ = write!(f, "{} → {}", format_slot(start), format_slot(end));
},
)
}
fn progress_message(
detail: &str,
placement_wait: Option<Duration>,
show_placement_wait: bool,
) -> String {
let Some(wait) = placement_wait.filter(|_| show_placement_wait) else {
return detail.to_string();
};
let wait = format_compact_duration(wait);
if detail.is_empty() {
format!("waited {wait} for placement")
} else {
format!("waited {wait} for placement, {detail}")
}
}
fn format_compact_duration(duration: Duration) -> String {
let total_secs = duration.as_secs();
let hours = total_secs / 3600;
let minutes = (total_secs % 3600) / 60;
let seconds = total_secs % 60;
match (hours, minutes, seconds) {
(0, 0, seconds) => format!("{seconds}s"),
(0, minutes, seconds) => format!("{minutes}m{seconds:02}s"),
(hours, minutes, seconds) => format!("{hours}h{minutes:02}m{seconds:02}s"),
}
}
fn bar_style(start: u64, end: u64) -> ProgressStyle {
ProgressStyle::with_template("[{range}] {bar:40.cyan/blue} {pos}/{len} slots ({elapsed}) {msg}")
.unwrap_or_else(|_| ProgressStyle::default_bar())
.with_key(
"range",
move |_: &ProgressState, f: &mut dyn std::fmt::Write| {
let _ = write!(f, "{} → {}", format_slot(start), format_slot(end));
},
)
}
fn format_slot(n: u64) -> String {
let s = n.to_string();
let mut out = String::new();
for (i, c) in s.chars().rev().enumerate() {
if i > 0 && i % 3 == 0 {
out.push(',');
}
out.push(c);
}
out.chars().rev().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn progress_message_omits_wait_when_debug_disabled() {
let message = progress_message("starting runtime", Some(Duration::from_secs(83)), false);
assert_eq!(message, "starting runtime");
}
#[test]
fn progress_message_includes_compact_wait_when_enabled() {
let message = progress_message("starting runtime", Some(Duration::from_secs(83)), true);
assert_eq!(message, "waited 1m23s for placement, starting runtime");
}
#[test]
fn format_compact_duration_covers_hours() {
assert_eq!(
format_compact_duration(Duration::from_secs(3723)),
"1h02m03s"
);
}
#[test]
fn update_progress_position_tracks_slot_offset() {
let pb = ProgressBar::new(100);
update_progress_position(&pb, 362_270_659, 362_270_659);
assert_eq!(pb.position(), 1);
update_progress_position(&pb, 362_270_659, 362_270_700);
assert_eq!(pb.position(), 42);
}
#[test]
fn waiting_message_uses_latest_status_when_present() {
assert_eq!(
waiting_message(
"waiting for next continue",
Some(&BacktestStatus::ExecutingBlockTransactions),
),
"waiting for next continue: executing block transactions"
);
}
#[test]
fn should_show_waiting_message_only_after_slot_silence() {
let timeout = Duration::from_secs(WAITING_FOR_NEXT_CONTINUE_AFTER_SECS);
assert!(should_show_waiting_message(None, timeout));
assert!(!should_show_waiting_message(Some(Instant::now()), timeout));
}
}