ipfrs_storage/
workload.rs

1//! Workload simulation and generation for testing and benchmarking
2//!
3//! This module provides utilities for generating realistic storage workloads
4//! for testing, benchmarking, and capacity planning.
5
6use crate::traits::BlockStore;
7use crate::utils::create_block;
8use ipfrs_core::{Block, Cid, Result};
9use rand::Rng;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::time::sleep;
15
16/// Workload pattern for simulation
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub enum WorkloadPattern {
19    /// Uniform random access
20    Uniform,
21    /// Zipf distribution (80/20 rule)
22    Zipfian { alpha: f64 },
23    /// Sequential access pattern
24    Sequential,
25    /// Burst pattern with periods of high activity
26    Bursty {
27        burst_duration: Duration,
28        idle_duration: Duration,
29    },
30    /// Time-series pattern (recent blocks more likely)
31    TimeSeries { decay_factor: f64 },
32}
33
34/// Operation mix for workload
35#[derive(Debug, Clone, Serialize, Deserialize)]
36pub struct OperationMix {
37    /// Percentage of put operations (0.0 - 1.0)
38    pub put_ratio: f64,
39    /// Percentage of get operations (0.0 - 1.0)
40    pub get_ratio: f64,
41    /// Percentage of has operations (0.0 - 1.0)
42    pub has_ratio: f64,
43    /// Percentage of delete operations (0.0 - 1.0)
44    pub delete_ratio: f64,
45}
46
47impl OperationMix {
48    /// Create a read-heavy workload (80% reads, 20% writes)
49    pub fn read_heavy() -> Self {
50        Self {
51            put_ratio: 0.15,
52            get_ratio: 0.70,
53            has_ratio: 0.10,
54            delete_ratio: 0.05,
55        }
56    }
57
58    /// Create a write-heavy workload (80% writes, 20% reads)
59    pub fn write_heavy() -> Self {
60        Self {
61            put_ratio: 0.60,
62            get_ratio: 0.15,
63            has_ratio: 0.05,
64            delete_ratio: 0.20,
65        }
66    }
67
68    /// Create a balanced workload
69    pub fn balanced() -> Self {
70        Self {
71            put_ratio: 0.25,
72            get_ratio: 0.50,
73            has_ratio: 0.15,
74            delete_ratio: 0.10,
75        }
76    }
77
78    /// Create a cache-like workload (mostly reads, few writes)
79    pub fn cache() -> Self {
80        Self {
81            put_ratio: 0.10,
82            get_ratio: 0.85,
83            has_ratio: 0.04,
84            delete_ratio: 0.01,
85        }
86    }
87
88    /// Validate that ratios sum to 1.0
89    pub fn validate(&self) -> bool {
90        let sum = self.put_ratio + self.get_ratio + self.has_ratio + self.delete_ratio;
91        (sum - 1.0).abs() < 0.001
92    }
93}
94
95/// Block size distribution for workload
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub enum SizeDistribution {
98    /// Fixed size blocks
99    Fixed { size: usize },
100    /// Uniform distribution between min and max
101    Uniform { min: usize, max: usize },
102    /// Normal distribution
103    Normal { mean: usize, stddev: usize },
104    /// Mixed sizes (small, medium, large with percentages)
105    Mixed {
106        small_size: usize,
107        small_pct: f64,
108        medium_size: usize,
109        medium_pct: f64,
110        large_size: usize,
111        large_pct: f64,
112    },
113}
114
115/// Workload configuration
116#[derive(Debug, Clone)]
117pub struct WorkloadConfig {
118    /// Total number of operations to perform
119    pub total_operations: usize,
120    /// Number of unique blocks in the dataset
121    pub dataset_size: usize,
122    /// Operation mix
123    pub operation_mix: OperationMix,
124    /// Access pattern
125    pub pattern: WorkloadPattern,
126    /// Block size distribution
127    pub size_distribution: SizeDistribution,
128    /// Concurrency level (number of parallel tasks)
129    pub concurrency: usize,
130    /// Rate limit (operations per second, 0 = unlimited)
131    pub rate_limit: usize,
132    /// Percentage of compressible blocks (0.0 - 1.0)
133    pub compressible_ratio: f64,
134}
135
136impl Default for WorkloadConfig {
137    fn default() -> Self {
138        Self {
139            total_operations: 10_000,
140            dataset_size: 1_000,
141            operation_mix: OperationMix::balanced(),
142            pattern: WorkloadPattern::Uniform,
143            size_distribution: SizeDistribution::Uniform {
144                min: 1024,
145                max: 65536,
146            },
147            concurrency: 4,
148            rate_limit: 0,
149            compressible_ratio: 0.5,
150        }
151    }
152}
153
154/// Workload execution results
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct WorkloadResult {
157    /// Total operations executed
158    pub total_operations: usize,
159    /// Operations per second
160    pub ops_per_second: f64,
161    /// Total duration
162    pub duration: Duration,
163    /// Per-operation breakdown
164    pub operation_counts: HashMap<String, usize>,
165    /// Per-operation latencies (microseconds)
166    pub operation_latencies: HashMap<String, Vec<u64>>,
167    /// Errors encountered
168    pub errors: usize,
169    /// Throughput in bytes per second
170    pub throughput_bps: f64,
171}
172
173impl WorkloadResult {
174    /// Calculate average latency for an operation
175    pub fn avg_latency(&self, operation: &str) -> Option<f64> {
176        self.operation_latencies.get(operation).map(|latencies| {
177            if latencies.is_empty() {
178                0.0
179            } else {
180                latencies.iter().sum::<u64>() as f64 / latencies.len() as f64
181            }
182        })
183    }
184
185    /// Calculate P95 latency for an operation
186    pub fn p95_latency(&self, operation: &str) -> Option<u64> {
187        self.operation_latencies
188            .get(operation)
189            .and_then(|latencies| {
190                if latencies.is_empty() {
191                    None
192                } else {
193                    let mut sorted = latencies.clone();
194                    sorted.sort_unstable();
195                    let idx = (sorted.len() as f64 * 0.95) as usize;
196                    Some(sorted[idx.min(sorted.len() - 1)])
197                }
198            })
199    }
200
201    /// Calculate P99 latency for an operation
202    pub fn p99_latency(&self, operation: &str) -> Option<u64> {
203        self.operation_latencies
204            .get(operation)
205            .and_then(|latencies| {
206                if latencies.is_empty() {
207                    None
208                } else {
209                    let mut sorted = latencies.clone();
210                    sorted.sort_unstable();
211                    let idx = (sorted.len() as f64 * 0.99) as usize;
212                    Some(sorted[idx.min(sorted.len() - 1)])
213                }
214            })
215    }
216}
217
218/// Workload simulator for generating and executing storage workloads
219pub struct WorkloadSimulator {
220    config: WorkloadConfig,
221    dataset: Vec<Block>,
222    cids: Vec<Cid>,
223}
224
225impl WorkloadSimulator {
226    /// Create a new workload simulator with the given configuration
227    pub fn new(config: WorkloadConfig) -> Self {
228        Self {
229            config,
230            dataset: Vec::new(),
231            cids: Vec::new(),
232        }
233    }
234
235    /// Generate the initial dataset
236    pub fn generate_dataset(&mut self) {
237        let mut rng = rand::rng();
238        self.dataset.clear();
239        self.cids.clear();
240
241        for _ in 0..self.config.dataset_size {
242            let size = self.generate_block_size(&mut rng);
243            let data: Vec<u8> = (0..size).map(|_| rng.random::<u8>()).collect();
244            let block = create_block(data).expect("Failed to create block");
245            self.cids.push(*block.cid());
246            self.dataset.push(block);
247        }
248    }
249
250    /// Generate a block size according to the distribution
251    fn generate_block_size(&self, rng: &mut impl Rng) -> usize {
252        match &self.config.size_distribution {
253            SizeDistribution::Fixed { size } => *size,
254            SizeDistribution::Uniform { min, max } => rng.random_range(*min..=*max),
255            SizeDistribution::Normal { mean, stddev } => {
256                // Box-Muller transform for normal distribution
257                let u1: f64 = rng.random();
258                let u2: f64 = rng.random();
259                let z = (-2.0 * u1.ln()).sqrt() * (2.0 * std::f64::consts::PI * u2).cos();
260                let size = *mean as f64 + z * (*stddev as f64);
261                size.max(1.0) as usize
262            }
263            SizeDistribution::Mixed {
264                small_size,
265                small_pct,
266                medium_size,
267                medium_pct,
268                large_size,
269                large_pct: _,
270            } => {
271                let r: f64 = rng.random();
272                if r < *small_pct {
273                    *small_size
274                } else if r < *small_pct + *medium_pct {
275                    *medium_size
276                } else {
277                    *large_size
278                }
279            }
280        }
281    }
282
283    /// Select a block index according to the access pattern
284    #[allow(dead_code)]
285    fn select_block_index(&self, rng: &mut impl Rng, operation_num: usize) -> usize {
286        match &self.config.pattern {
287            WorkloadPattern::Uniform => rng.random_range(0..self.dataset.len()),
288            WorkloadPattern::Zipfian { alpha } => {
289                // Zipfian distribution using rejection sampling
290                let n = self.dataset.len() as f64;
291                loop {
292                    let u: f64 = rng.random();
293                    let v: f64 = rng.random();
294                    let x = ((n.powf(1.0 - alpha) - 1.0) * u + 1.0).powf(1.0 / (1.0 - alpha));
295                    if x <= n && v * x.powf(*alpha) <= 1.0 {
296                        return (x - 1.0) as usize;
297                    }
298                }
299            }
300            WorkloadPattern::Sequential => operation_num % self.dataset.len(),
301            WorkloadPattern::Bursty { .. } => rng.random_range(0..self.dataset.len()),
302            WorkloadPattern::TimeSeries { decay_factor } => {
303                // Exponential decay for time-series access
304                let r: f64 = rng.random();
305                let idx = (-r.ln() / decay_factor) as usize;
306                idx.min(self.dataset.len() - 1)
307            }
308        }
309    }
310
311    /// Select an operation according to the operation mix
312    #[allow(dead_code)]
313    fn select_operation(&self, rng: &mut impl Rng) -> &str {
314        let r: f64 = rng.random();
315        let mix = &self.config.operation_mix;
316
317        if r < mix.put_ratio {
318            "put"
319        } else if r < mix.put_ratio + mix.get_ratio {
320            "get"
321        } else if r < mix.put_ratio + mix.get_ratio + mix.has_ratio {
322            "has"
323        } else {
324            "delete"
325        }
326    }
327
328    /// Run the workload against a block store
329    pub async fn run<S: BlockStore + Send + Sync + 'static>(
330        &self,
331        store: Arc<S>,
332    ) -> Result<WorkloadResult> {
333        let start = Instant::now();
334        let mut operation_counts: HashMap<String, usize> = HashMap::new();
335        let mut operation_latencies: HashMap<String, Vec<u64>> = HashMap::new();
336        let mut errors = 0usize;
337        let mut total_bytes = 0usize;
338
339        // Divide operations among concurrent tasks
340        let ops_per_task = self.config.total_operations / self.config.concurrency;
341        let mut tasks = Vec::new();
342
343        for task_id in 0..self.config.concurrency {
344            let store = store.clone();
345            let dataset = self.dataset.clone();
346            let cids = self.cids.clone();
347            let config = self.config.clone();
348            let start_op = task_id * ops_per_task;
349            let end_op = if task_id == self.config.concurrency - 1 {
350                self.config.total_operations
351            } else {
352                (task_id + 1) * ops_per_task
353            };
354
355            let task = tokio::spawn(async move {
356                // Use a simple deterministic RNG for the task
357                use rand::SeedableRng;
358                let mut rng = rand::rngs::SmallRng::seed_from_u64(task_id as u64);
359                let mut task_counts: HashMap<String, usize> = HashMap::new();
360                let mut task_latencies: HashMap<String, Vec<u64>> = HashMap::new();
361                let mut task_errors = 0usize;
362                let mut task_bytes = 0usize;
363
364                for op_num in start_op..end_op {
365                    // Rate limiting
366                    if config.rate_limit > 0 {
367                        let delay = Duration::from_secs_f64(1.0 / config.rate_limit as f64);
368                        sleep(delay).await;
369                    }
370
371                    let idx = if dataset.is_empty() {
372                        0
373                    } else {
374                        op_num % dataset.len()
375                    };
376                    let operation = if dataset.is_empty() {
377                        "get"
378                    } else {
379                        let r: f64 = rng.random();
380                        let mix = &config.operation_mix;
381                        if r < mix.put_ratio {
382                            "put"
383                        } else if r < mix.put_ratio + mix.get_ratio {
384                            "get"
385                        } else if r < mix.put_ratio + mix.get_ratio + mix.has_ratio {
386                            "has"
387                        } else {
388                            "delete"
389                        }
390                    };
391
392                    let op_start = Instant::now();
393                    let result = match operation {
394                        "put" => {
395                            if idx < dataset.len() {
396                                task_bytes += dataset[idx].data().len();
397                                store.put(&dataset[idx]).await
398                            } else {
399                                Ok(())
400                            }
401                        }
402                        "get" => {
403                            if idx < cids.len() {
404                                match store.get(&cids[idx]).await {
405                                    Ok(Some(block)) => {
406                                        task_bytes += block.data().len();
407                                        Ok(())
408                                    }
409                                    Ok(None) => Ok(()),
410                                    Err(e) => Err(e),
411                                }
412                            } else {
413                                Ok(())
414                            }
415                        }
416                        "has" => {
417                            if idx < cids.len() {
418                                store.has(&cids[idx]).await.map(|_| ())
419                            } else {
420                                Ok(())
421                            }
422                        }
423                        "delete" => {
424                            if idx < cids.len() {
425                                store.delete(&cids[idx]).await
426                            } else {
427                                Ok(())
428                            }
429                        }
430                        _ => Ok(()),
431                    };
432
433                    let latency = op_start.elapsed().as_micros() as u64;
434
435                    *task_counts.entry(operation.to_string()).or_insert(0) += 1;
436                    task_latencies
437                        .entry(operation.to_string())
438                        .or_default()
439                        .push(latency);
440
441                    if result.is_err() {
442                        task_errors += 1;
443                    }
444                }
445
446                (task_counts, task_latencies, task_errors, task_bytes)
447            });
448
449            tasks.push(task);
450        }
451
452        // Collect results from all tasks
453        for task in tasks {
454            let (task_counts, task_latencies, task_errors, task_bytes) = task.await.unwrap();
455
456            for (op, count) in task_counts {
457                *operation_counts.entry(op).or_insert(0) += count;
458            }
459
460            for (op, latencies) in task_latencies {
461                operation_latencies.entry(op).or_default().extend(latencies);
462            }
463
464            errors += task_errors;
465            total_bytes += task_bytes;
466        }
467
468        let duration = start.elapsed();
469        let ops_per_second = self.config.total_operations as f64 / duration.as_secs_f64();
470        let throughput_bps = total_bytes as f64 / duration.as_secs_f64();
471
472        Ok(WorkloadResult {
473            total_operations: self.config.total_operations,
474            ops_per_second,
475            duration,
476            operation_counts,
477            operation_latencies,
478            errors,
479            throughput_bps,
480        })
481    }
482}
483
484/// Workload presets for common scenarios
485pub struct WorkloadPresets;
486
487impl WorkloadPresets {
488    /// Light testing workload (1K operations, 100 blocks)
489    #[must_use]
490    pub fn light_test() -> WorkloadConfig {
491        WorkloadConfig {
492            total_operations: 1_000,
493            dataset_size: 100,
494            operation_mix: OperationMix::balanced(),
495            pattern: WorkloadPattern::Uniform,
496            size_distribution: SizeDistribution::Uniform {
497                min: 1024,
498                max: 4096,
499            },
500            concurrency: 2,
501            rate_limit: 0,
502            compressible_ratio: 0.5,
503        }
504    }
505
506    /// Medium stress test (100K operations, 10K blocks)
507    #[must_use]
508    pub fn medium_stress() -> WorkloadConfig {
509        WorkloadConfig {
510            total_operations: 100_000,
511            dataset_size: 10_000,
512            operation_mix: OperationMix::balanced(),
513            pattern: WorkloadPattern::Zipfian { alpha: 1.1 },
514            size_distribution: SizeDistribution::Mixed {
515                small_size: 1024,
516                small_pct: 0.5,
517                medium_size: 16384,
518                medium_pct: 0.3,
519                large_size: 65536,
520                large_pct: 0.2,
521            },
522            concurrency: 8,
523            rate_limit: 0,
524            compressible_ratio: 0.7,
525        }
526    }
527
528    /// Heavy stress test (1M operations, 100K blocks)
529    #[must_use]
530    pub fn heavy_stress() -> WorkloadConfig {
531        WorkloadConfig {
532            total_operations: 1_000_000,
533            dataset_size: 100_000,
534            operation_mix: OperationMix::balanced(),
535            pattern: WorkloadPattern::Zipfian { alpha: 1.1 },
536            size_distribution: SizeDistribution::Mixed {
537                small_size: 1024,
538                small_pct: 0.4,
539                medium_size: 32768,
540                medium_pct: 0.4,
541                large_size: 262144,
542                large_pct: 0.2,
543            },
544            concurrency: 16,
545            rate_limit: 0,
546            compressible_ratio: 0.6,
547        }
548    }
549
550    /// CDN cache simulation (read-heavy, Zipfian access)
551    #[must_use]
552    pub fn cdn_cache() -> WorkloadConfig {
553        WorkloadConfig {
554            total_operations: 50_000,
555            dataset_size: 5_000,
556            operation_mix: OperationMix::cache(),
557            pattern: WorkloadPattern::Zipfian { alpha: 1.2 },
558            size_distribution: SizeDistribution::Mixed {
559                small_size: 4096,
560                small_pct: 0.3,
561                medium_size: 65536,
562                medium_pct: 0.5,
563                large_size: 1048576,
564                large_pct: 0.2,
565            },
566            concurrency: 12,
567            rate_limit: 0,
568            compressible_ratio: 0.8,
569        }
570    }
571
572    /// Data ingestion pipeline (write-heavy)
573    #[must_use]
574    pub fn ingestion_pipeline() -> WorkloadConfig {
575        WorkloadConfig {
576            total_operations: 100_000,
577            dataset_size: 50_000,
578            operation_mix: OperationMix::write_heavy(),
579            pattern: WorkloadPattern::Sequential,
580            size_distribution: SizeDistribution::Normal {
581                mean: 32768,
582                stddev: 8192,
583            },
584            concurrency: 8,
585            rate_limit: 1000, // 1000 ops/sec
586            compressible_ratio: 0.9,
587        }
588    }
589
590    /// Time-series data access
591    #[must_use]
592    pub fn time_series() -> WorkloadConfig {
593        WorkloadConfig {
594            total_operations: 20_000,
595            dataset_size: 10_000,
596            operation_mix: OperationMix::read_heavy(),
597            pattern: WorkloadPattern::TimeSeries { decay_factor: 0.1 },
598            size_distribution: SizeDistribution::Fixed { size: 8192 },
599            concurrency: 4,
600            rate_limit: 0,
601            compressible_ratio: 0.6,
602        }
603    }
604}
605
606#[cfg(test)]
607mod tests {
608    use super::*;
609    use crate::MemoryBlockStore;
610
611    #[test]
612    fn test_operation_mix_validation() {
613        let mix = OperationMix::balanced();
614        assert!(mix.validate());
615
616        let invalid_mix = OperationMix {
617            put_ratio: 0.5,
618            get_ratio: 0.3,
619            has_ratio: 0.1,
620            delete_ratio: 0.05,
621        };
622        assert!(!invalid_mix.validate());
623    }
624
625    #[test]
626    fn test_dataset_generation() {
627        let config = WorkloadPresets::light_test();
628        let mut simulator = WorkloadSimulator::new(config);
629        simulator.generate_dataset();
630
631        assert_eq!(simulator.dataset.len(), 100);
632        assert_eq!(simulator.cids.len(), 100);
633    }
634
635    #[tokio::test]
636    async fn test_light_workload() {
637        let config = WorkloadPresets::light_test();
638        let mut simulator = WorkloadSimulator::new(config);
639        simulator.generate_dataset();
640
641        let store = Arc::new(MemoryBlockStore::new());
642        let result = simulator.run(store).await.unwrap();
643
644        assert_eq!(result.total_operations, 1_000);
645        assert!(result.ops_per_second > 0.0);
646        assert!(result.operation_counts.len() > 0);
647    }
648
649    #[tokio::test]
650    async fn test_workload_latencies() {
651        let config = WorkloadPresets::light_test();
652        let mut simulator = WorkloadSimulator::new(config);
653        simulator.generate_dataset();
654
655        let store = Arc::new(MemoryBlockStore::new());
656        let result = simulator.run(store).await.unwrap();
657
658        // Check that latencies are recorded
659        for latencies in result.operation_latencies.values() {
660            assert!(!latencies.is_empty());
661        }
662
663        // Check percentile calculations
664        // Note: p95 can be 0 for very fast in-memory operations
665        // Just verify that p95 calculation works (returns Some)
666        assert!(result.p95_latency("get").is_some());
667    }
668
669    #[test]
670    fn test_workload_presets() {
671        let _light = WorkloadPresets::light_test();
672        let _medium = WorkloadPresets::medium_stress();
673        let _heavy = WorkloadPresets::heavy_stress();
674        let _cdn = WorkloadPresets::cdn_cache();
675        let _ingestion = WorkloadPresets::ingestion_pipeline();
676        let _timeseries = WorkloadPresets::time_series();
677    }
678}