sochdb_storage/
parallel_merge.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parallel K-Way Merge for Compaction
16//!
17//! Implements multi-threaded merging of SSTables for faster compaction.
18//!
19//! ## jj.md Task 8: Parallel K-Way Merge
20//!
21//! Goals:
22//! - 3-5x faster compaction with 4+ cores
23//! - Reduced compaction debt accumulation
24//! - Better CPU utilization during background work
25//!
26//! ## Architecture
27//!
28//! ```text
29//! Input SSTables (parallel read):
30//! [SST1] --read--> [Decompressor1] --\
31//! [SST2] --read--> [Decompressor2] ----> [Lock-free Merge Queue]
32//! [SST3] --read--> [Decompressor3] --/
33//!
34//! Merge Phase (parallelized by key range):
35//! Range [0, N/4)    --merge--> [Writer1]
36//! Range [N/4, N/2)  --merge--> [Writer2]
37//! Range [N/2, 3N/4) --merge--> [Writer3]
38//! Range [3N/4, N)   --merge--> [Writer4]
39//! ```
40//!
41//! Reference: RocksDB uses `SubcompactionState` for parallel range compaction
42
43use crossbeam_channel::{Receiver, Sender, bounded};
44use std::cmp::Ordering;
45use std::collections::BinaryHeap;
46use std::sync::Arc;
47use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
48#[cfg(test)]
49use std::thread;
50
51/// Edge data for merging (simplified representation)
52#[derive(Clone, Debug)]
53pub struct MergeEdge {
54    /// Edge ID
55    pub edge_id: u128,
56    /// Timestamp in microseconds
57    pub timestamp_us: u64,
58    /// Whether this is a tombstone
59    pub is_tombstone: bool,
60    /// Raw edge bytes (128 bytes)
61    pub data: [u8; 128],
62    /// Source SSTable index
63    pub source_idx: usize,
64}
65
66impl PartialEq for MergeEdge {
67    fn eq(&self, other: &Self) -> bool {
68        self.timestamp_us == other.timestamp_us && self.edge_id == other.edge_id
69    }
70}
71
72impl Eq for MergeEdge {}
73
74impl PartialOrd for MergeEdge {
75    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
76        Some(self.cmp(other))
77    }
78}
79
80impl Ord for MergeEdge {
81    fn cmp(&self, other: &Self) -> Ordering {
82        // Min-heap: reverse ordering (smaller timestamp first)
83        match other.timestamp_us.cmp(&self.timestamp_us) {
84            Ordering::Equal => other.edge_id.cmp(&self.edge_id),
85            ord => ord,
86        }
87    }
88}
89
90/// Entry in the merge heap
91struct HeapEntry {
92    edge: MergeEdge,
93    source_idx: usize,
94}
95
96impl PartialEq for HeapEntry {
97    fn eq(&self, other: &Self) -> bool {
98        self.edge == other.edge
99    }
100}
101
102impl Eq for HeapEntry {}
103
104impl PartialOrd for HeapEntry {
105    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
106        Some(self.cmp(other))
107    }
108}
109
110impl Ord for HeapEntry {
111    fn cmp(&self, other: &Self) -> Ordering {
112        self.edge.cmp(&other.edge)
113    }
114}
115
116/// Configuration for parallel merge
117#[derive(Debug, Clone)]
118pub struct ParallelMergeConfig {
119    /// Number of reader threads
120    pub reader_threads: usize,
121    /// Number of merger threads (for range partitioning)
122    pub merger_threads: usize,
123    /// Channel buffer size per input
124    pub channel_buffer_size: usize,
125    /// Batch size for reading
126    pub read_batch_size: usize,
127}
128
129impl Default for ParallelMergeConfig {
130    fn default() -> Self {
131        let num_cpus = num_cpus::get();
132        Self {
133            reader_threads: (num_cpus / 2).max(1),
134            merger_threads: (num_cpus / 2).max(1),
135            channel_buffer_size: 1024,
136            read_batch_size: 256,
137        }
138    }
139}
140
141impl ParallelMergeConfig {
142    /// Create config optimized for the given number of input files
143    pub fn for_inputs(num_inputs: usize) -> Self {
144        let num_cpus = num_cpus::get();
145        Self {
146            reader_threads: num_inputs.min(num_cpus),
147            merger_threads: (num_cpus / 2).max(1),
148            channel_buffer_size: 1024,
149            read_batch_size: 256,
150        }
151    }
152}
153
154/// Statistics for parallel merge operations
155#[derive(Debug, Default)]
156pub struct ParallelMergeStats {
157    /// Total edges read from inputs
158    pub edges_read: AtomicU64,
159    /// Total edges written to output
160    pub edges_written: AtomicU64,
161    /// Tombstones filtered out
162    pub tombstones_filtered: AtomicU64,
163    /// Duplicate edges merged
164    pub duplicates_merged: AtomicU64,
165}
166
167impl ParallelMergeStats {
168    /// Create new stats
169    pub fn new() -> Arc<Self> {
170        Arc::new(Self::default())
171    }
172
173    /// Record edges read
174    pub fn record_read(&self, count: u64) {
175        self.edges_read.fetch_add(count, AtomicOrdering::Relaxed);
176    }
177
178    /// Record edges written
179    pub fn record_written(&self, count: u64) {
180        self.edges_written.fetch_add(count, AtomicOrdering::Relaxed);
181    }
182
183    /// Record filtered tombstones
184    pub fn record_tombstone(&self) {
185        self.tombstones_filtered
186            .fetch_add(1, AtomicOrdering::Relaxed);
187    }
188
189    /// Record merged duplicates
190    pub fn record_duplicate(&self) {
191        self.duplicates_merged.fetch_add(1, AtomicOrdering::Relaxed);
192    }
193
194    /// Get snapshot of stats
195    pub fn snapshot(&self) -> ParallelMergeStatsSnapshot {
196        ParallelMergeStatsSnapshot {
197            edges_read: self.edges_read.load(AtomicOrdering::Relaxed),
198            edges_written: self.edges_written.load(AtomicOrdering::Relaxed),
199            tombstones_filtered: self.tombstones_filtered.load(AtomicOrdering::Relaxed),
200            duplicates_merged: self.duplicates_merged.load(AtomicOrdering::Relaxed),
201        }
202    }
203}
204
205/// Snapshot of merge stats
206#[derive(Debug, Clone)]
207pub struct ParallelMergeStatsSnapshot {
208    pub edges_read: u64,
209    pub edges_written: u64,
210    pub tombstones_filtered: u64,
211    pub duplicates_merged: u64,
212}
213
214// =============================================================================
215// Task 7 Enhancement: I/O Throttling for Compaction
216// =============================================================================
217
218/// Token bucket-based I/O throttler for compaction
219///
220/// ## Purpose
221/// Prevents compaction from saturating I/O bandwidth and causing
222/// latency spikes for foreground queries.
223///
224/// ## Algorithm
225/// Uses token bucket with configurable:
226/// - Bucket capacity (burst allowance)
227/// - Refill rate (sustained IOPS/bandwidth limit)
228/// - Adaptive scaling based on system load
229///
230/// ## Reference
231/// Based on RocksDB's `rate_limiter` implementation which uses
232/// token bucket for I/O bandwidth control.
233#[derive(Debug)]
234pub struct IoThrottler {
235    /// Available tokens (bytes or IOPS)
236    tokens: AtomicU64,
237    /// Maximum bucket capacity
238    capacity: u64,
239    /// Refill rate per second
240    refill_rate: u64,
241    /// Last refill timestamp (microseconds)
242    last_refill: AtomicU64,
243    /// Total bytes throttled (for stats)
244    total_throttled: AtomicU64,
245    /// Total wait time (microseconds)
246    total_wait_us: AtomicU64,
247    /// Whether throttling is enabled
248    enabled: std::sync::atomic::AtomicBool,
249}
250
251impl IoThrottler {
252    /// Create a new I/O throttler
253    ///
254    /// # Arguments
255    /// * `rate_bytes_per_sec` - Sustained I/O rate limit
256    /// * `burst_bytes` - Maximum burst allowance
257    pub fn new(rate_bytes_per_sec: u64, burst_bytes: u64) -> Self {
258        Self {
259            tokens: AtomicU64::new(burst_bytes),
260            capacity: burst_bytes,
261            refill_rate: rate_bytes_per_sec,
262            last_refill: AtomicU64::new(Self::now_us()),
263            total_throttled: AtomicU64::new(0),
264            total_wait_us: AtomicU64::new(0),
265            enabled: std::sync::atomic::AtomicBool::new(true),
266        }
267    }
268
269    /// Create throttler with sensible defaults for compaction
270    ///
271    /// Defaults to 100 MB/s sustained, 10 MB burst
272    pub fn for_compaction() -> Self {
273        Self::new(100 * 1024 * 1024, 10 * 1024 * 1024)
274    }
275
276    /// Create throttler that doesn't limit (for testing)
277    pub fn unlimited() -> Self {
278        let throttler = Self::new(u64::MAX, u64::MAX);
279        throttler.enabled.store(false, AtomicOrdering::Relaxed);
280        throttler
281    }
282
283    fn now_us() -> u64 {
284        std::time::SystemTime::now()
285            .duration_since(std::time::UNIX_EPOCH)
286            .unwrap()
287            .as_micros() as u64
288    }
289
290    /// Refill tokens based on elapsed time
291    fn refill(&self) {
292        let now = Self::now_us();
293        let last = self.last_refill.swap(now, AtomicOrdering::AcqRel);
294        let elapsed_us = now.saturating_sub(last);
295
296        if elapsed_us > 0 {
297            // tokens_to_add = elapsed_seconds * rate
298            // = elapsed_us / 1_000_000 * rate
299            let tokens_to_add = (elapsed_us as u128 * self.refill_rate as u128 / 1_000_000) as u64;
300
301            if tokens_to_add > 0 {
302                let current = self.tokens.load(AtomicOrdering::Relaxed);
303                let new_tokens = current.saturating_add(tokens_to_add).min(self.capacity);
304                self.tokens.store(new_tokens, AtomicOrdering::Release);
305            }
306        }
307    }
308
309    /// Request I/O tokens, blocking if necessary
310    ///
311    /// Returns the actual wait time in microseconds
312    pub fn acquire(&self, bytes: u64) -> u64 {
313        if !self.enabled.load(AtomicOrdering::Relaxed) {
314            return 0;
315        }
316
317        let mut total_wait = 0u64;
318
319        loop {
320            self.refill();
321
322            let current = self.tokens.load(AtomicOrdering::Acquire);
323
324            if current >= bytes {
325                // Try to consume tokens
326                match self.tokens.compare_exchange_weak(
327                    current,
328                    current - bytes,
329                    AtomicOrdering::AcqRel,
330                    AtomicOrdering::Acquire,
331                ) {
332                    Ok(_) => {
333                        if total_wait > 0 {
334                            self.total_wait_us
335                                .fetch_add(total_wait, AtomicOrdering::Relaxed);
336                            self.total_throttled
337                                .fetch_add(bytes, AtomicOrdering::Relaxed);
338                        }
339                        return total_wait;
340                    }
341                    Err(_) => continue, // Retry
342                }
343            }
344
345            // Not enough tokens - wait for refill
346            // Calculate wait time based on deficit
347            let deficit = bytes.saturating_sub(current);
348            let wait_us = (deficit as u128 * 1_000_000 / self.refill_rate as u128) as u64;
349            let wait_us = wait_us.clamp(100, 100_000); // Between 100us and 100ms
350
351            std::thread::sleep(std::time::Duration::from_micros(wait_us));
352            total_wait += wait_us;
353        }
354    }
355
356    /// Try to acquire tokens without blocking
357    ///
358    /// Returns true if tokens were acquired, false otherwise
359    pub fn try_acquire(&self, bytes: u64) -> bool {
360        if !self.enabled.load(AtomicOrdering::Relaxed) {
361            return true;
362        }
363
364        self.refill();
365
366        let current = self.tokens.load(AtomicOrdering::Acquire);
367        if current >= bytes {
368            self.tokens
369                .compare_exchange_weak(
370                    current,
371                    current - bytes,
372                    AtomicOrdering::AcqRel,
373                    AtomicOrdering::Acquire,
374                )
375                .is_ok()
376        } else {
377            false
378        }
379    }
380
381    /// Enable or disable throttling
382    pub fn set_enabled(&self, enabled: bool) {
383        self.enabled.store(enabled, AtomicOrdering::Release);
384    }
385
386    /// Check if throttling is enabled
387    pub fn is_enabled(&self) -> bool {
388        self.enabled.load(AtomicOrdering::Relaxed)
389    }
390
391    /// Update rate limit (for adaptive control)
392    pub fn set_rate(&mut self, rate_bytes_per_sec: u64) {
393        self.refill_rate = rate_bytes_per_sec;
394    }
395
396    /// Get current available tokens
397    pub fn available_tokens(&self) -> u64 {
398        self.refill();
399        self.tokens.load(AtomicOrdering::Relaxed)
400    }
401
402    /// Get throttler statistics
403    pub fn stats(&self) -> IoThrottlerStats {
404        IoThrottlerStats {
405            total_throttled_bytes: self.total_throttled.load(AtomicOrdering::Relaxed),
406            total_wait_us: self.total_wait_us.load(AtomicOrdering::Relaxed),
407            available_tokens: self.available_tokens(),
408            rate_bytes_per_sec: self.refill_rate,
409            enabled: self.is_enabled(),
410        }
411    }
412}
413
414/// I/O throttler statistics
415#[derive(Debug, Clone)]
416pub struct IoThrottlerStats {
417    /// Total bytes that were throttled
418    pub total_throttled_bytes: u64,
419    /// Total time spent waiting (microseconds)
420    pub total_wait_us: u64,
421    /// Currently available tokens
422    pub available_tokens: u64,
423    /// Configured rate limit
424    pub rate_bytes_per_sec: u64,
425    /// Whether throttling is enabled
426    pub enabled: bool,
427}
428
429/// Adaptive I/O controller that adjusts throttling based on system load
430///
431/// Monitors foreground query latency and adjusts compaction I/O
432/// rate to maintain acceptable performance.
433#[derive(Debug)]
434pub struct AdaptiveIoController {
435    /// Base throttler
436    throttler: IoThrottler,
437    /// Target p99 latency for foreground queries (microseconds)
438    target_latency_us: u64,
439    /// Minimum I/O rate (never throttle below this)
440    min_rate: u64,
441    /// Maximum I/O rate (upper bound)
442    max_rate: u64,
443    /// Current rate multiplier (0.1 to 1.0)
444    rate_multiplier: std::sync::atomic::AtomicU64,
445}
446
447impl AdaptiveIoController {
448    /// Create new adaptive controller
449    pub fn new(base_rate: u64, target_latency_us: u64) -> Self {
450        Self {
451            throttler: IoThrottler::new(base_rate, base_rate / 10),
452            target_latency_us,
453            min_rate: base_rate / 10,
454            max_rate: base_rate,
455            rate_multiplier: std::sync::atomic::AtomicU64::new(1000), // 1.0 as fixed point
456        }
457    }
458
459    /// Report observed foreground latency
460    ///
461    /// Used to adjust compaction I/O rate
462    pub fn report_latency(&self, latency_us: u64) {
463        let current_mult = self.rate_multiplier.load(AtomicOrdering::Relaxed);
464
465        let new_mult = if latency_us > self.target_latency_us * 2 {
466            // Latency too high - reduce compaction I/O significantly
467            (current_mult * 8 / 10).max(100) // 0.8x, min 0.1
468        } else if latency_us > self.target_latency_us {
469            // Latency slightly high - reduce slightly
470            (current_mult * 95 / 100).max(100) // 0.95x
471        } else if latency_us < self.target_latency_us / 2 && current_mult < 1000 {
472            // Latency low - can increase compaction I/O
473            (current_mult * 105 / 100).min(1000) // 1.05x, max 1.0
474        } else {
475            current_mult
476        };
477
478        self.rate_multiplier
479            .store(new_mult, AtomicOrdering::Relaxed);
480    }
481
482    /// Get current effective rate
483    pub fn effective_rate(&self) -> u64 {
484        let mult = self.rate_multiplier.load(AtomicOrdering::Relaxed);
485        let rate = (self.max_rate as u128 * mult as u128 / 1000) as u64;
486        rate.max(self.min_rate).min(self.max_rate)
487    }
488
489    /// Acquire I/O tokens with adaptive rate
490    pub fn acquire(&self, bytes: u64) -> u64 {
491        self.throttler.acquire(bytes)
492    }
493
494    /// Get the underlying throttler
495    pub fn throttler(&self) -> &IoThrottler {
496        &self.throttler
497    }
498}
499
500/// A producer that reads edges from an SSTable and sends them to a channel
501pub struct ParallelReader {
502    /// Channel sender for edges
503    sender: Sender<MergeEdge>,
504    /// Source index
505    source_idx: usize,
506    /// Stats
507    stats: Arc<ParallelMergeStats>,
508}
509
510impl ParallelReader {
511    /// Create a new parallel reader
512    pub fn new(
513        sender: Sender<MergeEdge>,
514        source_idx: usize,
515        stats: Arc<ParallelMergeStats>,
516    ) -> Self {
517        Self {
518            sender,
519            source_idx,
520            stats,
521        }
522    }
523
524    /// Send an edge to the merge channel
525    #[allow(clippy::result_large_err)]
526    pub fn send(&self, edge: MergeEdge) -> Result<(), crossbeam_channel::SendError<MergeEdge>> {
527        self.stats.record_read(1);
528        self.sender.send(edge)
529    }
530
531    /// Get the source index
532    pub fn source_idx(&self) -> usize {
533        self.source_idx
534    }
535}
536
537/// Multi-source merge coordinator
538pub struct ParallelMerger {
539    /// Receivers from each input source
540    receivers: Vec<Receiver<MergeEdge>>,
541    /// Configuration (reserved for future use)
542    #[allow(dead_code)]
543    config: ParallelMergeConfig,
544    /// Stats
545    stats: Arc<ParallelMergeStats>,
546}
547
548impl ParallelMerger {
549    /// Create channels for parallel reading
550    pub fn create_channels(
551        num_inputs: usize,
552        config: &ParallelMergeConfig,
553        stats: Arc<ParallelMergeStats>,
554    ) -> (Vec<ParallelReader>, Self) {
555        let mut senders = Vec::with_capacity(num_inputs);
556        let mut receivers = Vec::with_capacity(num_inputs);
557
558        for i in 0..num_inputs {
559            let (tx, rx) = bounded(config.channel_buffer_size);
560            senders.push(ParallelReader::new(tx, i, stats.clone()));
561            receivers.push(rx);
562        }
563
564        let merger = Self {
565            receivers,
566            config: config.clone(),
567            stats,
568        };
569
570        (senders, merger)
571    }
572
573    /// Perform the merge, collecting results into a vector
574    ///
575    /// This uses a binary heap for K-way merge with proper duplicate handling.
576    pub fn merge(self) -> Vec<MergeEdge> {
577        let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::new();
578        let mut result = Vec::new();
579        let mut last_key: Option<(u64, u128)> = None;
580
581        // Initialize heap with first element from each receiver
582        for (idx, rx) in self.receivers.iter().enumerate() {
583            if let Ok(edge) = rx.recv() {
584                heap.push(HeapEntry {
585                    edge,
586                    source_idx: idx,
587                });
588            }
589        }
590
591        while let Some(entry) = heap.pop() {
592            let edge = entry.edge;
593            let source_idx = entry.source_idx;
594
595            // Check for duplicates (same timestamp + edge_id)
596            let key = (edge.timestamp_us, edge.edge_id);
597            let is_duplicate = last_key.map(|k| k == key).unwrap_or(false);
598
599            if is_duplicate {
600                self.stats.record_duplicate();
601            } else if edge.is_tombstone {
602                // For now, we keep tombstones in output but track them
603                self.stats.record_tombstone();
604                result.push(edge.clone());
605                self.stats.record_written(1);
606            } else {
607                result.push(edge.clone());
608                self.stats.record_written(1);
609            }
610
611            last_key = Some(key);
612
613            // Get next element from the same source
614            if let Ok(next_edge) = self.receivers[source_idx].recv() {
615                heap.push(HeapEntry {
616                    edge: next_edge,
617                    source_idx,
618                });
619            }
620        }
621
622        result
623    }
624
625    /// Perform merge with a callback for each output edge
626    pub fn merge_with_callback<F>(self, mut callback: F)
627    where
628        F: FnMut(MergeEdge),
629    {
630        let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::new();
631        let mut last_key: Option<(u64, u128)> = None;
632
633        // Initialize heap
634        for (idx, rx) in self.receivers.iter().enumerate() {
635            if let Ok(edge) = rx.recv() {
636                heap.push(HeapEntry {
637                    edge,
638                    source_idx: idx,
639                });
640            }
641        }
642
643        while let Some(entry) = heap.pop() {
644            let edge = entry.edge;
645            let source_idx = entry.source_idx;
646
647            let key = (edge.timestamp_us, edge.edge_id);
648            let is_duplicate = last_key.map(|k| k == key).unwrap_or(false);
649
650            if is_duplicate {
651                self.stats.record_duplicate();
652            } else {
653                if edge.is_tombstone {
654                    self.stats.record_tombstone();
655                }
656                callback(edge.clone());
657                self.stats.record_written(1);
658            }
659
660            last_key = Some(key);
661
662            // Get next from same source
663            if let Ok(next_edge) = self.receivers[source_idx].recv() {
664                heap.push(HeapEntry {
665                    edge: next_edge,
666                    source_idx,
667                });
668            }
669        }
670    }
671}
672
673/// Range partition for sub-compaction
674#[derive(Debug, Clone)]
675pub struct KeyRange {
676    /// Minimum timestamp (inclusive)
677    pub min_ts: u64,
678    /// Maximum timestamp (exclusive)
679    pub max_ts: u64,
680}
681
682impl KeyRange {
683    /// Create a new key range
684    pub fn new(min_ts: u64, max_ts: u64) -> Self {
685        Self { min_ts, max_ts }
686    }
687
688    /// Check if a timestamp falls within this range
689    pub fn contains(&self, ts: u64) -> bool {
690        ts >= self.min_ts && ts < self.max_ts
691    }
692}
693
694/// Partition key space for parallel sub-compaction
695pub fn partition_key_space(min_ts: u64, max_ts: u64, num_partitions: usize) -> Vec<KeyRange> {
696    if num_partitions == 0 || max_ts <= min_ts {
697        return vec![KeyRange::new(min_ts, max_ts)];
698    }
699
700    let range = max_ts - min_ts;
701    let partition_size = range / num_partitions as u64;
702
703    (0..num_partitions)
704        .map(|i| {
705            let start = min_ts + (i as u64 * partition_size);
706            let end = if i == num_partitions - 1 {
707                max_ts
708            } else {
709                min_ts + ((i as u64 + 1) * partition_size)
710            };
711            KeyRange::new(start, end)
712        })
713        .collect()
714}
715
716/// Builder for setting up a parallel merge operation
717pub struct ParallelMergeBuilder {
718    config: ParallelMergeConfig,
719    stats: Arc<ParallelMergeStats>,
720}
721
722impl ParallelMergeBuilder {
723    /// Create a new builder
724    pub fn new() -> Self {
725        Self {
726            config: ParallelMergeConfig::default(),
727            stats: ParallelMergeStats::new(),
728        }
729    }
730
731    /// Set configuration
732    pub fn config(mut self, config: ParallelMergeConfig) -> Self {
733        self.config = config;
734        self
735    }
736
737    /// Set stats tracker
738    pub fn stats(mut self, stats: Arc<ParallelMergeStats>) -> Self {
739        self.stats = stats;
740        self
741    }
742
743    /// Create channels for the given number of inputs
744    pub fn build(self, num_inputs: usize) -> (Vec<ParallelReader>, ParallelMerger) {
745        ParallelMerger::create_channels(num_inputs, &self.config, self.stats)
746    }
747}
748
749impl Default for ParallelMergeBuilder {
750    fn default() -> Self {
751        Self::new()
752    }
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use std::sync::atomic::AtomicUsize;
759
760    fn create_test_edge(edge_id: u128, timestamp_us: u64, is_tombstone: bool) -> MergeEdge {
761        MergeEdge {
762            edge_id,
763            timestamp_us,
764            is_tombstone,
765            data: [0u8; 128],
766            source_idx: 0,
767        }
768    }
769
770    #[test]
771    fn test_merge_edge_ordering() {
772        let e1 = create_test_edge(1, 1000, false);
773        let e2 = create_test_edge(2, 1000, false);
774        let e3 = create_test_edge(1, 2000, false);
775
776        // In min-heap, smaller timestamps should come first
777        assert!(e1 > e2); // Same timestamp, different edge_id
778        assert!(e1 > e3); // e1 has smaller timestamp, should be "greater" for min-heap
779    }
780
781    #[test]
782    fn test_parallel_merge_basic() {
783        let stats = ParallelMergeStats::new();
784        let config = ParallelMergeConfig {
785            reader_threads: 2,
786            merger_threads: 1,
787            channel_buffer_size: 100,
788            read_batch_size: 10,
789        };
790
791        let (readers, merger) = ParallelMerger::create_channels(3, &config, stats.clone());
792
793        // Spawn reader threads
794        let handles: Vec<_> = readers
795            .into_iter()
796            .enumerate()
797            .map(|(idx, reader)| {
798                thread::spawn(move || {
799                    for i in 0..10u64 {
800                        let edge = MergeEdge {
801                            edge_id: (idx * 100 + i as usize) as u128,
802                            timestamp_us: i * 100 + idx as u64, // Interleaved timestamps
803                            is_tombstone: false,
804                            data: [0u8; 128],
805                            source_idx: idx,
806                        };
807                        reader.send(edge).unwrap();
808                    }
809                })
810            })
811            .collect();
812
813        // Wait for readers
814        for h in handles {
815            h.join().unwrap();
816        }
817
818        // Perform merge
819        let result = merger.merge();
820
821        // Should have 30 edges (10 from each of 3 sources)
822        assert_eq!(result.len(), 30);
823
824        // Verify sorted order
825        for i in 1..result.len() {
826            let prev = &result[i - 1];
827            let curr = &result[i];
828            assert!(
829                prev.timestamp_us < curr.timestamp_us
830                    || (prev.timestamp_us == curr.timestamp_us && prev.edge_id < curr.edge_id),
831                "Results should be sorted"
832            );
833        }
834
835        // Check stats
836        let snapshot = stats.snapshot();
837        assert_eq!(snapshot.edges_read, 30);
838        assert_eq!(snapshot.edges_written, 30);
839    }
840
841    #[test]
842    fn test_parallel_merge_duplicates() {
843        let stats = ParallelMergeStats::new();
844        let config = ParallelMergeConfig::default();
845        let (readers, merger) = ParallelMerger::create_channels(2, &config, stats.clone());
846
847        // Both sources have the same edge (duplicate)
848        let handles: Vec<_> = readers
849            .into_iter()
850            .map(|reader| {
851                thread::spawn(move || {
852                    let edge = create_test_edge(42, 1000, false);
853                    reader.send(edge).unwrap();
854                })
855            })
856            .collect();
857
858        for h in handles {
859            h.join().unwrap();
860        }
861
862        let result = merger.merge();
863
864        // Should deduplicate
865        assert_eq!(result.len(), 1);
866
867        let snapshot = stats.snapshot();
868        assert_eq!(snapshot.edges_read, 2);
869        assert_eq!(snapshot.duplicates_merged, 1);
870    }
871
872    #[test]
873    fn test_partition_key_space() {
874        let partitions = partition_key_space(0, 1000, 4);
875
876        assert_eq!(partitions.len(), 4);
877        assert_eq!(partitions[0].min_ts, 0);
878        assert_eq!(partitions[0].max_ts, 250);
879        assert_eq!(partitions[1].min_ts, 250);
880        assert_eq!(partitions[1].max_ts, 500);
881        assert_eq!(partitions[2].min_ts, 500);
882        assert_eq!(partitions[2].max_ts, 750);
883        assert_eq!(partitions[3].min_ts, 750);
884        assert_eq!(partitions[3].max_ts, 1000);
885    }
886
887    #[test]
888    fn test_key_range_contains() {
889        let range = KeyRange::new(100, 200);
890
891        assert!(!range.contains(99));
892        assert!(range.contains(100));
893        assert!(range.contains(150));
894        assert!(!range.contains(200));
895    }
896
897    #[test]
898    fn test_merge_with_callback() {
899        let stats = ParallelMergeStats::new();
900        let config = ParallelMergeConfig::default();
901        let (readers, merger) = ParallelMerger::create_channels(2, &config, stats.clone());
902
903        let handles: Vec<_> = readers
904            .into_iter()
905            .enumerate()
906            .map(|(idx, reader)| {
907                thread::spawn(move || {
908                    for i in 0..5u64 {
909                        let edge =
910                            create_test_edge((idx * 10 + i as usize) as u128, i * 100, false);
911                        reader.send(edge).unwrap();
912                    }
913                })
914            })
915            .collect();
916
917        for h in handles {
918            h.join().unwrap();
919        }
920
921        let count = Arc::new(AtomicUsize::new(0));
922        let count_clone = count.clone();
923
924        merger.merge_with_callback(move |_edge| {
925            count_clone.fetch_add(1, AtomicOrdering::Relaxed);
926        });
927
928        assert_eq!(count.load(AtomicOrdering::Relaxed), 10);
929    }
930
931    #[test]
932    fn test_parallel_merge_config() {
933        let config = ParallelMergeConfig::for_inputs(8);
934        assert!(config.reader_threads >= 1);
935        assert!(config.merger_threads >= 1);
936
937        let default = ParallelMergeConfig::default();
938        assert!(default.channel_buffer_size > 0);
939        assert!(default.read_batch_size > 0);
940    }
941
942    // I/O Throttler Tests
943
944    #[test]
945    fn test_throttler_basic() {
946        let throttler = IoThrottler::new(1_000_000, 100_000); // 1 MB/s, 100KB burst
947
948        // Should be able to acquire up to burst size immediately
949        assert!(throttler.try_acquire(50_000));
950        assert!(throttler.try_acquire(50_000));
951
952        // Burst exhausted, should fail
953        assert!(!throttler.try_acquire(50_000));
954    }
955
956    #[test]
957    fn test_throttler_unlimited() {
958        let throttler = IoThrottler::unlimited();
959
960        // Should always succeed
961        assert!(throttler.try_acquire(1_000_000_000));
962        assert!(!throttler.is_enabled());
963    }
964
965    #[test]
966    fn test_throttler_blocking_acquire() {
967        // Very low rate to ensure blocking
968        let throttler = IoThrottler::new(1_000, 100); // 1 KB/s, 100 byte burst
969
970        // Exhaust burst
971        assert!(throttler.try_acquire(100));
972
973        // This should block briefly
974        let start = std::time::Instant::now();
975        let _wait = throttler.acquire(100);
976        let elapsed = start.elapsed();
977
978        // Should have waited at least a little (100 bytes at 1KB/s = 100ms)
979        // But we're lenient since timing varies
980        assert!(elapsed.as_millis() >= 10 || throttler.stats().total_wait_us > 0);
981    }
982
983    #[test]
984    fn test_throttler_stats() {
985        let throttler = IoThrottler::new(10_000_000, 10_000);
986
987        // Get initial tokens
988        let initial = throttler.stats().available_tokens;
989
990        throttler.try_acquire(5_000);
991        throttler.try_acquire(3_000);
992
993        let stats = throttler.stats();
994        // Should have consumed 8000 tokens (may have refilled slightly)
995        assert!(stats.available_tokens < initial);
996        assert_eq!(stats.rate_bytes_per_sec, 10_000_000);
997        assert!(stats.enabled);
998    }
999
1000    #[test]
1001    fn test_adaptive_controller() {
1002        let controller = AdaptiveIoController::new(100_000_000, 10_000); // 100 MB/s, 10ms target
1003
1004        // Report high latency
1005        controller.report_latency(50_000); // 50ms - way too high
1006        let rate1 = controller.effective_rate();
1007
1008        controller.report_latency(50_000);
1009        let rate2 = controller.effective_rate();
1010
1011        // Rate should decrease
1012        assert!(rate2 <= rate1);
1013
1014        // Report low latency - rate should recover
1015        for _ in 0..20 {
1016            controller.report_latency(1_000); // 1ms - very low
1017        }
1018        let rate3 = controller.effective_rate();
1019
1020        // Should have recovered somewhat
1021        assert!(rate3 >= rate2);
1022    }
1023}