use std::path::Path;
use std::time::Instant;
use anyhow::{Result, bail};
use dynamo_kv_router::config::KvRouterConfig;
use super::online;
use super::validate::{
validate_offline_concurrency_args, validate_offline_disagg_concurrency_args,
validate_offline_disagg_replay_args, validate_offline_replay_args,
validate_online_concurrency_args, validate_online_replay_args,
};
use super::{
OfflineDisaggReplayConfig, ReplayPrefillLoadEstimator, ReplayRouterMode, ReplayWorkerArtifacts,
TraceSimulationReport,
};
use crate::common::protocols::{DirectRequest, MockEngineArgs};
use crate::loadgen::Trace;
pub fn generate_trace_worker_artifacts_offline(
args: MockEngineArgs,
trace: Trace,
) -> Result<ReplayWorkerArtifacts> {
let args = args.normalized()?;
crate::replay::offline::generate_trace_worker_artifacts(args, trace)
}
pub fn simulate_trace_file(
args: MockEngineArgs,
trace_path: &Path,
trace_block_size: usize,
num_workers: usize,
arrival_speedup_ratio: f64,
) -> Result<TraceSimulationReport> {
simulate_trace_file_with_router_mode(
args,
None,
None,
trace_path,
trace_block_size,
num_workers,
arrival_speedup_ratio,
ReplayRouterMode::RoundRobin,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_trace_file_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
num_workers: usize,
arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_offline_replay_args(&args, num_workers, router_mode)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)?
.normalize_session_starts()?
.speed_up_timing(arrival_speedup_ratio)?;
let started_at = Instant::now();
let report = crate::replay::offline::simulate_trace_workload(
args,
router_config,
prefill_load_estimator,
trace,
num_workers,
router_mode,
)?;
Ok(report.with_wall_time_ms(started_at.elapsed().as_secs_f64() * 1000.0))
}
pub fn simulate_trace_file_disagg_with_router_mode(
config: OfflineDisaggReplayConfig,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let config = config.normalized()?;
validate_offline_disagg_replay_args(&config, router_mode)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)?
.normalize_session_starts()?
.speed_up_timing(arrival_speedup_ratio)?;
let started_at = Instant::now();
let report = crate::replay::offline::simulate_trace_workload_disagg(
config,
router_config,
prefill_load_estimator,
trace,
router_mode,
)?;
Ok(report.with_wall_time_ms(started_at.elapsed().as_secs_f64() * 1000.0))
}
pub fn simulate_trace_live_file(
args: MockEngineArgs,
trace_path: &Path,
trace_block_size: usize,
num_workers: usize,
arrival_speedup_ratio: f64,
) -> Result<TraceSimulationReport> {
simulate_trace_live_file_with_router_mode(
args,
None,
None,
trace_path,
trace_block_size,
num_workers,
arrival_speedup_ratio,
ReplayRouterMode::RoundRobin,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_trace_live_file_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
num_workers: usize,
arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_online_replay_args(&args, num_workers)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)?
.normalize_session_starts()?
.speed_up_timing(arrival_speedup_ratio)?;
online::simulate_trace_workload(
args,
router_config,
prefill_load_estimator,
trace,
num_workers,
router_mode,
)
}
pub fn simulate_trace_requests(
args: MockEngineArgs,
requests: Vec<DirectRequest>,
num_workers: usize,
arrival_speedup_ratio: f64,
) -> Result<TraceSimulationReport> {
simulate_trace_requests_with_router_mode(
args,
None,
None,
requests,
num_workers,
arrival_speedup_ratio,
ReplayRouterMode::RoundRobin,
)
}
pub fn simulate_trace_requests_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
requests: Vec<DirectRequest>,
num_workers: usize,
arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_offline_replay_args(&args, num_workers, router_mode)?;
if requests.is_empty() {
bail!("trace replay requires at least one request");
}
let started_at = Instant::now();
let report = crate::replay::offline::simulate_trace(
args,
router_config,
prefill_load_estimator,
requests,
num_workers,
arrival_speedup_ratio,
router_mode,
)?;
Ok(report.with_wall_time_ms(started_at.elapsed().as_secs_f64() * 1000.0))
}
pub fn simulate_trace_requests_disagg_with_router_mode(
config: OfflineDisaggReplayConfig,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
requests: Vec<DirectRequest>,
arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let config = config.normalized()?;
validate_offline_disagg_replay_args(&config, router_mode)?;
if requests.is_empty() {
bail!("trace replay requires at least one request");
}
let started_at = Instant::now();
let report = crate::replay::offline::simulate_trace_disagg(
config,
router_config,
prefill_load_estimator,
requests,
arrival_speedup_ratio,
router_mode,
)?;
Ok(report.with_wall_time_ms(started_at.elapsed().as_secs_f64() * 1000.0))
}
pub fn simulate_trace_live_requests(
args: MockEngineArgs,
requests: Vec<DirectRequest>,
num_workers: usize,
arrival_speedup_ratio: f64,
) -> Result<TraceSimulationReport> {
simulate_trace_live_requests_with_router_mode(
args,
None,
None,
requests,
num_workers,
arrival_speedup_ratio,
ReplayRouterMode::RoundRobin,
)
}
pub fn simulate_trace_live_requests_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
requests: Vec<DirectRequest>,
num_workers: usize,
arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_online_replay_args(&args, num_workers)?;
if requests.is_empty() {
bail!("trace replay requires at least one request");
}
online::simulate_trace_requests(
args,
router_config,
prefill_load_estimator,
requests,
num_workers,
arrival_speedup_ratio,
router_mode,
)
}
pub fn simulate_concurrency_file(
args: MockEngineArgs,
trace_path: &Path,
trace_block_size: usize,
max_in_flight: usize,
num_workers: usize,
) -> Result<TraceSimulationReport> {
simulate_concurrency_file_with_router_mode(
args,
None,
None,
trace_path,
trace_block_size,
max_in_flight,
num_workers,
ReplayRouterMode::RoundRobin,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_concurrency_file_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
max_in_flight: usize,
num_workers: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_offline_concurrency_args(&args, num_workers, max_in_flight, router_mode)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)?;
let started_at = Instant::now();
let report = simulate_concurrency_workload_with_router_mode(
args,
router_config,
prefill_load_estimator,
trace,
max_in_flight,
num_workers,
router_mode,
)?;
Ok(report.with_wall_time_ms(started_at.elapsed().as_secs_f64() * 1000.0))
}
pub fn simulate_concurrency_file_disagg_with_router_mode(
config: OfflineDisaggReplayConfig,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
max_in_flight: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let config = config.normalized()?;
validate_offline_disagg_concurrency_args(&config, max_in_flight, router_mode)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)?;
let started_at = Instant::now();
let report = simulate_concurrency_workload_disagg_with_router_mode(
config,
router_config,
prefill_load_estimator,
trace,
max_in_flight,
router_mode,
)?;
Ok(report.with_wall_time_ms(started_at.elapsed().as_secs_f64() * 1000.0))
}
pub fn simulate_concurrency_live_file(
args: MockEngineArgs,
trace_path: &Path,
trace_block_size: usize,
max_in_flight: usize,
num_workers: usize,
) -> Result<TraceSimulationReport> {
simulate_concurrency_live_file_with_router_mode(
args,
None,
None,
trace_path,
trace_block_size,
max_in_flight,
num_workers,
ReplayRouterMode::RoundRobin,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_concurrency_live_file_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
max_in_flight: usize,
num_workers: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_online_concurrency_args(&args, num_workers, max_in_flight)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)?;
online::simulate_concurrency_workload(
args,
router_config,
prefill_load_estimator,
trace,
max_in_flight,
num_workers,
router_mode,
)
}
pub fn simulate_concurrency_live_requests(
args: MockEngineArgs,
requests: Vec<DirectRequest>,
max_in_flight: usize,
num_workers: usize,
) -> Result<TraceSimulationReport> {
simulate_concurrency_live_requests_with_router_mode(
args,
None,
None,
requests,
max_in_flight,
num_workers,
ReplayRouterMode::RoundRobin,
)
}
pub fn simulate_concurrency_live_requests_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
requests: Vec<DirectRequest>,
max_in_flight: usize,
num_workers: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_online_concurrency_args(&args, num_workers, max_in_flight)?;
if requests.is_empty() {
bail!("concurrency replay requires at least one request");
}
online::simulate_concurrency_requests(
args,
router_config,
prefill_load_estimator,
requests,
max_in_flight,
num_workers,
router_mode,
)
}
pub fn simulate_concurrency_requests(
args: MockEngineArgs,
requests: Vec<DirectRequest>,
max_in_flight: usize,
num_workers: usize,
) -> Result<TraceSimulationReport> {
simulate_concurrency_requests_with_router_mode(
args,
None,
None,
requests,
max_in_flight,
num_workers,
ReplayRouterMode::RoundRobin,
)
}
pub fn simulate_concurrency_requests_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
requests: Vec<DirectRequest>,
max_in_flight: usize,
num_workers: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_offline_concurrency_args(&args, num_workers, max_in_flight, router_mode)?;
if requests.is_empty() {
bail!("concurrency replay requires at least one request");
}
crate::replay::offline::simulate_concurrency(
args,
router_config,
prefill_load_estimator,
requests,
max_in_flight,
num_workers,
router_mode,
)
}
pub fn simulate_concurrency_requests_disagg_with_router_mode(
config: OfflineDisaggReplayConfig,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
requests: Vec<DirectRequest>,
max_in_flight: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let config = config.normalized()?;
validate_offline_disagg_concurrency_args(&config, max_in_flight, router_mode)?;
if requests.is_empty() {
bail!("concurrency replay requires at least one request");
}
crate::replay::offline::simulate_concurrency_disagg(
config,
router_config,
prefill_load_estimator,
requests,
max_in_flight,
router_mode,
)
}
pub fn simulate_trace_workload(
args: MockEngineArgs,
trace: Trace,
num_workers: usize,
) -> Result<TraceSimulationReport> {
simulate_trace_workload_with_router_mode(
args,
None,
None,
trace,
num_workers,
ReplayRouterMode::RoundRobin,
)
}
pub fn simulate_trace_workload_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace: Trace,
num_workers: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_offline_replay_args(&args, num_workers, router_mode)?;
let started_at = Instant::now();
let report = crate::replay::offline::simulate_trace_workload(
args,
router_config,
prefill_load_estimator,
trace,
num_workers,
router_mode,
)?;
Ok(report.with_wall_time_ms(started_at.elapsed().as_secs_f64() * 1000.0))
}
pub fn simulate_trace_workload_disagg_with_router_mode(
config: OfflineDisaggReplayConfig,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace: Trace,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let config = config.normalized()?;
validate_offline_disagg_replay_args(&config, router_mode)?;
let started_at = Instant::now();
let report = crate::replay::offline::simulate_trace_workload_disagg(
config,
router_config,
prefill_load_estimator,
trace,
router_mode,
)?;
Ok(report.with_wall_time_ms(started_at.elapsed().as_secs_f64() * 1000.0))
}
pub fn simulate_trace_live_workload(
args: MockEngineArgs,
trace: Trace,
num_workers: usize,
) -> Result<TraceSimulationReport> {
simulate_trace_live_workload_with_router_mode(
args,
None,
None,
trace,
num_workers,
ReplayRouterMode::RoundRobin,
)
}
pub fn simulate_trace_live_workload_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace: Trace,
num_workers: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_online_replay_args(&args, num_workers)?;
online::simulate_trace_workload(
args,
router_config,
prefill_load_estimator,
trace,
num_workers,
router_mode,
)
}
pub fn simulate_concurrency_workload(
args: MockEngineArgs,
trace: Trace,
max_in_flight: usize,
num_workers: usize,
) -> Result<TraceSimulationReport> {
simulate_concurrency_workload_with_router_mode(
args,
None,
None,
trace,
max_in_flight,
num_workers,
ReplayRouterMode::RoundRobin,
)
}
pub fn simulate_concurrency_workload_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace: Trace,
max_in_flight: usize,
num_workers: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_offline_concurrency_args(&args, num_workers, max_in_flight, router_mode)?;
crate::replay::offline::simulate_concurrency_workload(
args,
router_config,
prefill_load_estimator,
trace,
max_in_flight,
num_workers,
router_mode,
)
}
pub fn simulate_concurrency_workload_disagg_with_router_mode(
config: OfflineDisaggReplayConfig,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace: Trace,
max_in_flight: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let config = config.normalized()?;
validate_offline_disagg_concurrency_args(&config, max_in_flight, router_mode)?;
crate::replay::offline::simulate_concurrency_workload_disagg(
config,
router_config,
prefill_load_estimator,
trace,
max_in_flight,
router_mode,
)
}
pub fn simulate_concurrency_live_workload(
args: MockEngineArgs,
trace: Trace,
max_in_flight: usize,
num_workers: usize,
) -> Result<TraceSimulationReport> {
simulate_concurrency_live_workload_with_router_mode(
args,
None,
None,
trace,
max_in_flight,
num_workers,
ReplayRouterMode::RoundRobin,
)
}
pub fn simulate_concurrency_live_workload_with_router_mode(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace: Trace,
max_in_flight: usize,
num_workers: usize,
router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
let args = args.normalized()?;
validate_online_concurrency_args(&args, num_workers, max_in_flight)?;
online::simulate_concurrency_workload(
args,
router_config,
prefill_load_estimator,
trace,
max_in_flight,
num_workers,
router_mode,
)
}