1use crate::traits::BlockStore;
7use crate::utils::create_block;
8use ipfrs_core::{Block, Cid, Result};
9use rand::Rng;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::time::sleep;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub enum WorkloadPattern {
19 Uniform,
21 Zipfian { alpha: f64 },
23 Sequential,
25 Bursty {
27 burst_duration: Duration,
28 idle_duration: Duration,
29 },
30 TimeSeries { decay_factor: f64 },
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct OperationMix {
37 pub put_ratio: f64,
39 pub get_ratio: f64,
41 pub has_ratio: f64,
43 pub delete_ratio: f64,
45}
46
47impl OperationMix {
48 pub fn read_heavy() -> Self {
50 Self {
51 put_ratio: 0.15,
52 get_ratio: 0.70,
53 has_ratio: 0.10,
54 delete_ratio: 0.05,
55 }
56 }
57
58 pub fn write_heavy() -> Self {
60 Self {
61 put_ratio: 0.60,
62 get_ratio: 0.15,
63 has_ratio: 0.05,
64 delete_ratio: 0.20,
65 }
66 }
67
68 pub fn balanced() -> Self {
70 Self {
71 put_ratio: 0.25,
72 get_ratio: 0.50,
73 has_ratio: 0.15,
74 delete_ratio: 0.10,
75 }
76 }
77
78 pub fn cache() -> Self {
80 Self {
81 put_ratio: 0.10,
82 get_ratio: 0.85,
83 has_ratio: 0.04,
84 delete_ratio: 0.01,
85 }
86 }
87
88 pub fn validate(&self) -> bool {
90 let sum = self.put_ratio + self.get_ratio + self.has_ratio + self.delete_ratio;
91 (sum - 1.0).abs() < 0.001
92 }
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum SizeDistribution {
98 Fixed { size: usize },
100 Uniform { min: usize, max: usize },
102 Normal { mean: usize, stddev: usize },
104 Mixed {
106 small_size: usize,
107 small_pct: f64,
108 medium_size: usize,
109 medium_pct: f64,
110 large_size: usize,
111 large_pct: f64,
112 },
113}
114
115#[derive(Debug, Clone)]
117pub struct WorkloadConfig {
118 pub total_operations: usize,
120 pub dataset_size: usize,
122 pub operation_mix: OperationMix,
124 pub pattern: WorkloadPattern,
126 pub size_distribution: SizeDistribution,
128 pub concurrency: usize,
130 pub rate_limit: usize,
132 pub compressible_ratio: f64,
134}
135
136impl Default for WorkloadConfig {
137 fn default() -> Self {
138 Self {
139 total_operations: 10_000,
140 dataset_size: 1_000,
141 operation_mix: OperationMix::balanced(),
142 pattern: WorkloadPattern::Uniform,
143 size_distribution: SizeDistribution::Uniform {
144 min: 1024,
145 max: 65536,
146 },
147 concurrency: 4,
148 rate_limit: 0,
149 compressible_ratio: 0.5,
150 }
151 }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct WorkloadResult {
157 pub total_operations: usize,
159 pub ops_per_second: f64,
161 pub duration: Duration,
163 pub operation_counts: HashMap<String, usize>,
165 pub operation_latencies: HashMap<String, Vec<u64>>,
167 pub errors: usize,
169 pub throughput_bps: f64,
171}
172
173impl WorkloadResult {
174 pub fn avg_latency(&self, operation: &str) -> Option<f64> {
176 self.operation_latencies.get(operation).map(|latencies| {
177 if latencies.is_empty() {
178 0.0
179 } else {
180 latencies.iter().sum::<u64>() as f64 / latencies.len() as f64
181 }
182 })
183 }
184
185 pub fn p95_latency(&self, operation: &str) -> Option<u64> {
187 self.operation_latencies
188 .get(operation)
189 .and_then(|latencies| {
190 if latencies.is_empty() {
191 None
192 } else {
193 let mut sorted = latencies.clone();
194 sorted.sort_unstable();
195 let idx = (sorted.len() as f64 * 0.95) as usize;
196 Some(sorted[idx.min(sorted.len() - 1)])
197 }
198 })
199 }
200
201 pub fn p99_latency(&self, operation: &str) -> Option<u64> {
203 self.operation_latencies
204 .get(operation)
205 .and_then(|latencies| {
206 if latencies.is_empty() {
207 None
208 } else {
209 let mut sorted = latencies.clone();
210 sorted.sort_unstable();
211 let idx = (sorted.len() as f64 * 0.99) as usize;
212 Some(sorted[idx.min(sorted.len() - 1)])
213 }
214 })
215 }
216}
217
218pub struct WorkloadSimulator {
220 config: WorkloadConfig,
221 dataset: Vec<Block>,
222 cids: Vec<Cid>,
223}
224
225impl WorkloadSimulator {
226 pub fn new(config: WorkloadConfig) -> Self {
228 Self {
229 config,
230 dataset: Vec::new(),
231 cids: Vec::new(),
232 }
233 }
234
235 pub fn generate_dataset(&mut self) {
237 let mut rng = rand::rng();
238 self.dataset.clear();
239 self.cids.clear();
240
241 for _ in 0..self.config.dataset_size {
242 let size = self.generate_block_size(&mut rng);
243 let data: Vec<u8> = (0..size).map(|_| rng.random::<u8>()).collect();
244 let block = create_block(data).expect("Failed to create block");
245 self.cids.push(*block.cid());
246 self.dataset.push(block);
247 }
248 }
249
250 fn generate_block_size(&self, rng: &mut impl Rng) -> usize {
252 match &self.config.size_distribution {
253 SizeDistribution::Fixed { size } => *size,
254 SizeDistribution::Uniform { min, max } => rng.random_range(*min..=*max),
255 SizeDistribution::Normal { mean, stddev } => {
256 let u1: f64 = rng.random();
258 let u2: f64 = rng.random();
259 let z = (-2.0 * u1.ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos();
260 let size = *mean as f64 + z * (*stddev as f64);
261 size.max(1.0) as usize
262 }
263 SizeDistribution::Mixed {
264 small_size,
265 small_pct,
266 medium_size,
267 medium_pct,
268 large_size,
269 large_pct: _,
270 } => {
271 let r: f64 = rng.random();
272 if r < *small_pct {
273 *small_size
274 } else if r < *small_pct + *medium_pct {
275 *medium_size
276 } else {
277 *large_size
278 }
279 }
280 }
281 }
282
283 #[allow(dead_code)]
285 fn select_block_index(&self, rng: &mut impl Rng, operation_num: usize) -> usize {
286 match &self.config.pattern {
287 WorkloadPattern::Uniform => rng.random_range(0..self.dataset.len()),
288 WorkloadPattern::Zipfian { alpha } => {
289 let n = self.dataset.len() as f64;
291 loop {
292 let u: f64 = rng.random();
293 let v: f64 = rng.random();
294 let x = ((n.powf(1.0 - alpha) - 1.0) * u + 1.0).powf(1.0 / (1.0 - alpha));
295 if x <= n && v * x.powf(*alpha) <= 1.0 {
296 return (x - 1.0) as usize;
297 }
298 }
299 }
300 WorkloadPattern::Sequential => operation_num % self.dataset.len(),
301 WorkloadPattern::Bursty { .. } => rng.random_range(0..self.dataset.len()),
302 WorkloadPattern::TimeSeries { decay_factor } => {
303 let r: f64 = rng.random();
305 let idx = (-r.ln() / decay_factor) as usize;
306 idx.min(self.dataset.len() - 1)
307 }
308 }
309 }
310
311 #[allow(dead_code)]
313 fn select_operation(&self, rng: &mut impl Rng) -> &str {
314 let r: f64 = rng.random();
315 let mix = &self.config.operation_mix;
316
317 if r < mix.put_ratio {
318 "put"
319 } else if r < mix.put_ratio + mix.get_ratio {
320 "get"
321 } else if r < mix.put_ratio + mix.get_ratio + mix.has_ratio {
322 "has"
323 } else {
324 "delete"
325 }
326 }
327
328 pub async fn run<S: BlockStore + Send + Sync + 'static>(
330 &self,
331 store: Arc<S>,
332 ) -> Result<WorkloadResult> {
333 let start = Instant::now();
334 let mut operation_counts: HashMap<String, usize> = HashMap::new();
335 let mut operation_latencies: HashMap<String, Vec<u64>> = HashMap::new();
336 let mut errors = 0usize;
337 let mut total_bytes = 0usize;
338
339 let ops_per_task = self.config.total_operations / self.config.concurrency;
341 let mut tasks = Vec::new();
342
343 for task_id in 0..self.config.concurrency {
344 let store = store.clone();
345 let dataset = self.dataset.clone();
346 let cids = self.cids.clone();
347 let config = self.config.clone();
348 let start_op = task_id * ops_per_task;
349 let end_op = if task_id == self.config.concurrency - 1 {
350 self.config.total_operations
351 } else {
352 (task_id + 1) * ops_per_task
353 };
354
355 let task = tokio::spawn(async move {
356 use rand::SeedableRng;
358 let mut rng = rand::rngs::SmallRng::seed_from_u64(task_id as u64);
359 let mut task_counts: HashMap<String, usize> = HashMap::new();
360 let mut task_latencies: HashMap<String, Vec<u64>> = HashMap::new();
361 let mut task_errors = 0usize;
362 let mut task_bytes = 0usize;
363
364 for op_num in start_op..end_op {
365 if config.rate_limit > 0 {
367 let delay = Duration::from_secs_f64(1.0 / config.rate_limit as f64);
368 sleep(delay).await;
369 }
370
371 let idx = if dataset.is_empty() {
372 0
373 } else {
374 op_num % dataset.len()
375 };
376 let operation = if dataset.is_empty() {
377 "get"
378 } else {
379 let r: f64 = rng.random();
380 let mix = &config.operation_mix;
381 if r < mix.put_ratio {
382 "put"
383 } else if r < mix.put_ratio + mix.get_ratio {
384 "get"
385 } else if r < mix.put_ratio + mix.get_ratio + mix.has_ratio {
386 "has"
387 } else {
388 "delete"
389 }
390 };
391
392 let op_start = Instant::now();
393 let result = match operation {
394 "put" => {
395 if idx < dataset.len() {
396 task_bytes += dataset[idx].data().len();
397 store.put(&dataset[idx]).await
398 } else {
399 Ok(())
400 }
401 }
402 "get" => {
403 if idx < cids.len() {
404 match store.get(&cids[idx]).await {
405 Ok(Some(block)) => {
406 task_bytes += block.data().len();
407 Ok(())
408 }
409 Ok(None) => Ok(()),
410 Err(e) => Err(e),
411 }
412 } else {
413 Ok(())
414 }
415 }
416 "has" => {
417 if idx < cids.len() {
418 store.has(&cids[idx]).await.map(|_| ())
419 } else {
420 Ok(())
421 }
422 }
423 "delete" => {
424 if idx < cids.len() {
425 store.delete(&cids[idx]).await
426 } else {
427 Ok(())
428 }
429 }
430 _ => Ok(()),
431 };
432
433 let latency = op_start.elapsed().as_micros() as u64;
434
435 *task_counts.entry(operation.to_string()).or_insert(0) += 1;
436 task_latencies
437 .entry(operation.to_string())
438 .or_default()
439 .push(latency);
440
441 if result.is_err() {
442 task_errors += 1;
443 }
444 }
445
446 (task_counts, task_latencies, task_errors, task_bytes)
447 });
448
449 tasks.push(task);
450 }
451
452 for task in tasks {
454 let (task_counts, task_latencies, task_errors, task_bytes) = task.await.unwrap();
455
456 for (op, count) in task_counts {
457 *operation_counts.entry(op).or_insert(0) += count;
458 }
459
460 for (op, latencies) in task_latencies {
461 operation_latencies.entry(op).or_default().extend(latencies);
462 }
463
464 errors += task_errors;
465 total_bytes += task_bytes;
466 }
467
468 let duration = start.elapsed();
469 let ops_per_second = self.config.total_operations as f64 / duration.as_secs_f64();
470 let throughput_bps = total_bytes as f64 / duration.as_secs_f64();
471
472 Ok(WorkloadResult {
473 total_operations: self.config.total_operations,
474 ops_per_second,
475 duration,
476 operation_counts,
477 operation_latencies,
478 errors,
479 throughput_bps,
480 })
481 }
482}
483
484pub struct WorkloadPresets;
486
487impl WorkloadPresets {
488 #[must_use]
490 pub fn light_test() -> WorkloadConfig {
491 WorkloadConfig {
492 total_operations: 1_000,
493 dataset_size: 100,
494 operation_mix: OperationMix::balanced(),
495 pattern: WorkloadPattern::Uniform,
496 size_distribution: SizeDistribution::Uniform {
497 min: 1024,
498 max: 4096,
499 },
500 concurrency: 2,
501 rate_limit: 0,
502 compressible_ratio: 0.5,
503 }
504 }
505
506 #[must_use]
508 pub fn medium_stress() -> WorkloadConfig {
509 WorkloadConfig {
510 total_operations: 100_000,
511 dataset_size: 10_000,
512 operation_mix: OperationMix::balanced(),
513 pattern: WorkloadPattern::Zipfian { alpha: 1.1 },
514 size_distribution: SizeDistribution::Mixed {
515 small_size: 1024,
516 small_pct: 0.5,
517 medium_size: 16384,
518 medium_pct: 0.3,
519 large_size: 65536,
520 large_pct: 0.2,
521 },
522 concurrency: 8,
523 rate_limit: 0,
524 compressible_ratio: 0.7,
525 }
526 }
527
528 #[must_use]
530 pub fn heavy_stress() -> WorkloadConfig {
531 WorkloadConfig {
532 total_operations: 1_000_000,
533 dataset_size: 100_000,
534 operation_mix: OperationMix::balanced(),
535 pattern: WorkloadPattern::Zipfian { alpha: 1.1 },
536 size_distribution: SizeDistribution::Mixed {
537 small_size: 1024,
538 small_pct: 0.4,
539 medium_size: 32768,
540 medium_pct: 0.4,
541 large_size: 262144,
542 large_pct: 0.2,
543 },
544 concurrency: 16,
545 rate_limit: 0,
546 compressible_ratio: 0.6,
547 }
548 }
549
550 #[must_use]
552 pub fn cdn_cache() -> WorkloadConfig {
553 WorkloadConfig {
554 total_operations: 50_000,
555 dataset_size: 5_000,
556 operation_mix: OperationMix::cache(),
557 pattern: WorkloadPattern::Zipfian { alpha: 1.2 },
558 size_distribution: SizeDistribution::Mixed {
559 small_size: 4096,
560 small_pct: 0.3,
561 medium_size: 65536,
562 medium_pct: 0.5,
563 large_size: 1048576,
564 large_pct: 0.2,
565 },
566 concurrency: 12,
567 rate_limit: 0,
568 compressible_ratio: 0.8,
569 }
570 }
571
572 #[must_use]
574 pub fn ingestion_pipeline() -> WorkloadConfig {
575 WorkloadConfig {
576 total_operations: 100_000,
577 dataset_size: 50_000,
578 operation_mix: OperationMix::write_heavy(),
579 pattern: WorkloadPattern::Sequential,
580 size_distribution: SizeDistribution::Normal {
581 mean: 32768,
582 stddev: 8192,
583 },
584 concurrency: 8,
585 rate_limit: 1000, compressible_ratio: 0.9,
587 }
588 }
589
590 #[must_use]
592 pub fn time_series() -> WorkloadConfig {
593 WorkloadConfig {
594 total_operations: 20_000,
595 dataset_size: 10_000,
596 operation_mix: OperationMix::read_heavy(),
597 pattern: WorkloadPattern::TimeSeries { decay_factor: 0.1 },
598 size_distribution: SizeDistribution::Fixed { size: 8192 },
599 concurrency: 4,
600 rate_limit: 0,
601 compressible_ratio: 0.6,
602 }
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609 use crate::MemoryBlockStore;
610
611 #[test]
612 fn test_operation_mix_validation() {
613 let mix = OperationMix::balanced();
614 assert!(mix.validate());
615
616 let invalid_mix = OperationMix {
617 put_ratio: 0.5,
618 get_ratio: 0.3,
619 has_ratio: 0.1,
620 delete_ratio: 0.05,
621 };
622 assert!(!invalid_mix.validate());
623 }
624
625 #[test]
626 fn test_dataset_generation() {
627 let config = WorkloadPresets::light_test();
628 let mut simulator = WorkloadSimulator::new(config);
629 simulator.generate_dataset();
630
631 assert_eq!(simulator.dataset.len(), 100);
632 assert_eq!(simulator.cids.len(), 100);
633 }
634
635 #[tokio::test]
636 async fn test_light_workload() {
637 let config = WorkloadPresets::light_test();
638 let mut simulator = WorkloadSimulator::new(config);
639 simulator.generate_dataset();
640
641 let store = Arc::new(MemoryBlockStore::new());
642 let result = simulator.run(store).await.unwrap();
643
644 assert_eq!(result.total_operations, 1_000);
645 assert!(result.ops_per_second > 0.0);
646 assert!(result.operation_counts.len() > 0);
647 }
648
649 #[tokio::test]
650 async fn test_workload_latencies() {
651 let config = WorkloadPresets::light_test();
652 let mut simulator = WorkloadSimulator::new(config);
653 simulator.generate_dataset();
654
655 let store = Arc::new(MemoryBlockStore::new());
656 let result = simulator.run(store).await.unwrap();
657
658 for latencies in result.operation_latencies.values() {
660 assert!(!latencies.is_empty());
661 }
662
663 assert!(result.p95_latency("get").is_some());
667 }
668
669 #[test]
670 fn test_workload_presets() {
671 let _light = WorkloadPresets::light_test();
672 let _medium = WorkloadPresets::medium_stress();
673 let _heavy = WorkloadPresets::heavy_stress();
674 let _cdn = WorkloadPresets::cdn_cache();
675 let _ingestion = WorkloadPresets::ingestion_pipeline();
676 let _timeseries = WorkloadPresets::time_series();
677 }
678}