use super::core::{StressTestConfig, StressTestResult, run_stress_test};
use super::parallel_orchestrations::ProviderStressFactory;
use crate::runtime::registry::ActivityRegistry;
use crate::{ActivityContext, OrchestrationContext, OrchestrationRegistry};
#[derive(Debug, Clone)]
pub struct LargePayloadConfig {
pub base: StressTestConfig,
pub small_payload_kb: usize,
pub medium_payload_kb: usize,
pub large_payload_kb: usize,
pub activity_count: usize,
pub sub_orch_count: usize,
}
impl Default for LargePayloadConfig {
fn default() -> Self {
Self {
base: StressTestConfig {
max_concurrent: 5, duration_secs: 10,
tasks_per_instance: 1, activity_delay_ms: 5,
orch_concurrency: 1,
worker_concurrency: 1,
wait_timeout_secs: 120, },
small_payload_kb: 10,
medium_payload_kb: 50,
large_payload_kb: 100,
activity_count: 20, sub_orch_count: 5, }
}
}
pub async fn run_large_payload_test(
factory: &dyn ProviderStressFactory,
) -> Result<StressTestResult, Box<dyn std::error::Error>> {
let config = LargePayloadConfig::default();
run_large_payload_test_with_config(factory, config).await
}
pub async fn run_large_payload_test_with_config(
factory: &dyn ProviderStressFactory,
config: LargePayloadConfig,
) -> Result<StressTestResult, Box<dyn std::error::Error>> {
let provider = factory.create_provider().await;
let activities = create_large_payload_activities(
config.base.activity_delay_ms,
config.small_payload_kb,
config.medium_payload_kb,
config.large_payload_kb,
);
let orchestrations = create_large_payload_orchestrations(
config.small_payload_kb,
config.medium_payload_kb,
config.large_payload_kb,
config.activity_count,
config.sub_orch_count,
);
run_stress_test(config.base, provider, activities, orchestrations).await
}
fn create_large_payload_activities(
delay_ms: u64,
small_kb: usize,
medium_kb: usize,
large_kb: usize,
) -> ActivityRegistry {
ActivityRegistry::builder()
.register("SmallPayloadTask", move |_ctx: ActivityContext, input: String| {
let kb = small_kb;
let delay = delay_ms;
async move {
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
let payload = generate_payload(kb);
Ok(serde_json::to_string(&serde_json::json!({
"input": input,
"payload": payload,
"size_kb": kb
}))
.unwrap())
}
})
.register("MediumPayloadTask", move |_ctx: ActivityContext, input: String| {
let kb = medium_kb;
let delay = delay_ms;
async move {
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
let payload = generate_payload(kb);
Ok(serde_json::to_string(&serde_json::json!({
"input": input,
"payload": payload,
"size_kb": kb
}))
.unwrap())
}
})
.register("LargePayloadTask", move |_ctx: ActivityContext, input: String| {
let kb = large_kb;
let delay = delay_ms;
async move {
tokio::time::sleep(std::time::Duration::from_millis(delay)).await;
let payload = generate_payload(kb);
Ok(serde_json::to_string(&serde_json::json!({
"input": input,
"payload": payload,
"size_kb": kb
}))
.unwrap())
}
})
.build()
}
fn create_large_payload_orchestrations(
small_kb: usize,
medium_kb: usize,
large_kb: usize,
activity_count: usize,
sub_orch_count: usize,
) -> OrchestrationRegistry {
OrchestrationRegistry::builder()
.register(
"FanoutOrchestration",
move |ctx: OrchestrationContext, input: String| {
large_payload_orchestration(
ctx,
input,
small_kb,
medium_kb,
large_kb,
activity_count,
sub_orch_count,
)
},
)
.register(
"LargePayloadOrchestration",
move |ctx: OrchestrationContext, input: String| {
large_payload_orchestration(
ctx,
input,
small_kb,
medium_kb,
large_kb,
activity_count,
sub_orch_count,
)
},
)
.register(
"LargePayloadSubOrchestration",
move |ctx: OrchestrationContext, input: String| {
large_payload_sub_orchestration(ctx, input, small_kb, medium_kb)
},
)
.build()
}
async fn large_payload_orchestration(
ctx: OrchestrationContext,
input: String,
small_kb: usize,
medium_kb: usize,
large_kb: usize,
activity_count: usize,
sub_orch_count: usize,
) -> Result<String, String> {
let _config: serde_json::Value = serde_json::from_str(&input).map_err(|e| format!("Invalid input: {e}"))?;
let mut results = Vec::new();
let mut event_count = 0;
let small_count = activity_count / 3;
for i in 0..small_count {
let input = generate_payload(small_kb);
let result = ctx
.schedule_activity(
"SmallPayloadTask",
serde_json::json!({"index": i, "data": input}).to_string(),
)
.await;
results.push(result);
event_count += 2; }
let medium_count = activity_count / 3;
for i in 0..medium_count {
let input = generate_payload(medium_kb / 2); let result = ctx
.schedule_activity(
"MediumPayloadTask",
serde_json::json!({"index": i, "data": input}).to_string(),
)
.await;
results.push(result);
event_count += 2;
}
let large_count = activity_count - small_count - medium_count;
for i in 0..large_count {
let input = generate_payload(large_kb / 4); let result = ctx
.schedule_activity(
"LargePayloadTask",
serde_json::json!({"index": i, "data": input}).to_string(),
)
.await;
results.push(result);
event_count += 2;
}
let mut sub_orch_futures = Vec::new();
for i in 0..sub_orch_count {
let input = generate_payload(medium_kb);
let sub_input = serde_json::json!({
"index": i,
"payload": input
})
.to_string();
sub_orch_futures.push(ctx.schedule_sub_orchestration("LargePayloadSubOrchestration", sub_input));
event_count += 4; }
let sub_results = ctx.join(sub_orch_futures).await;
for i in 0..10 {
let input = format!("final-task-{i}");
let result = ctx.schedule_activity("SmallPayloadTask", input).await;
results.push(result);
event_count += 2;
}
let success_count = results.iter().filter(|r| r.is_ok()).count();
let sub_success_count = sub_results.iter().filter(|r| r.is_ok()).count();
let result_payload = generate_payload(large_kb);
Ok(serde_json::to_string(&serde_json::json!({
"activities_completed": success_count,
"sub_orchestrations_completed": sub_success_count,
"estimated_event_count": event_count,
"result_payload": result_payload,
"result_size_kb": large_kb
}))
.unwrap())
}
async fn large_payload_sub_orchestration(
ctx: OrchestrationContext,
input: String,
small_kb: usize,
medium_kb: usize,
) -> Result<String, String> {
let config: serde_json::Value = serde_json::from_str(&input).map_err(|e| format!("Invalid input: {e}"))?;
let _input_payload = config["payload"].as_str().unwrap_or("");
let task1_input = generate_payload(small_kb);
let result1 = ctx.schedule_activity("SmallPayloadTask", task1_input).await;
let task2_input = generate_payload(medium_kb / 2);
let result2 = ctx.schedule_activity("MediumPayloadTask", task2_input).await;
let success = result1.is_ok() && result2.is_ok();
let result_payload = generate_payload(medium_kb);
Ok(serde_json::to_string(&serde_json::json!({
"success": success,
"result_payload": result_payload,
"size_kb": medium_kb
}))
.unwrap())
}
fn generate_payload(kb: usize) -> String {
let pattern = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%^&*()";
let target_bytes = kb * 1024;
let pattern_len = pattern.len();
let repeat_count = (target_bytes / pattern_len) + 1;
pattern.repeat(repeat_count)[..target_bytes].to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_payload_sizes() {
let payload_10kb = generate_payload(10);
assert_eq!(payload_10kb.len(), 10 * 1024);
let payload_50kb = generate_payload(50);
assert_eq!(payload_50kb.len(), 50 * 1024);
let payload_100kb = generate_payload(100);
assert_eq!(payload_100kb.len(), 100 * 1024);
}
}