Skip to main content

sochdb_storage/
parallel_merge.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Parallel K-Way Merge for Compaction
19//!
20//! Implements multi-threaded merging of SSTables for faster compaction.
21//!
22//! ## jj.md Task 8: Parallel K-Way Merge
23//!
24//! Goals:
25//! - 3-5x faster compaction with 4+ cores
26//! - Reduced compaction debt accumulation
27//! - Better CPU utilization during background work
28//!
29//! ## Architecture
30//!
31//! ```text
32//! Input SSTables (parallel read):
33//! [SST1] --read--> [Decompressor1] --\
34//! [SST2] --read--> [Decompressor2] ----> [Lock-free Merge Queue]
35//! [SST3] --read--> [Decompressor3] --/
36//!
37//! Merge Phase (parallelized by key range):
38//! Range [0, N/4)    --merge--> [Writer1]
39//! Range [N/4, N/2)  --merge--> [Writer2]
40//! Range [N/2, 3N/4) --merge--> [Writer3]
41//! Range [3N/4, N)   --merge--> [Writer4]
42//! ```
43//!
44//! Reference: RocksDB uses `SubcompactionState` for parallel range compaction
45
46use crossbeam_channel::{Receiver, Sender, bounded};
47use std::cmp::Ordering;
48use std::collections::BinaryHeap;
49use std::sync::Arc;
50use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
51#[cfg(test)]
52use std::thread;
53
54/// Edge data for merging (simplified representation)
55#[derive(Clone, Debug)]
56pub struct MergeEdge {
57    /// Edge ID
58    pub edge_id: u128,
59    /// Timestamp in microseconds
60    pub timestamp_us: u64,
61    /// Whether this is a tombstone
62    pub is_tombstone: bool,
63    /// Raw edge bytes (128 bytes)
64    pub data: [u8; 128],
65    /// Source SSTable index
66    pub source_idx: usize,
67}
68
69impl PartialEq for MergeEdge {
70    fn eq(&self, other: &Self) -> bool {
71        self.timestamp_us == other.timestamp_us && self.edge_id == other.edge_id
72    }
73}
74
75impl Eq for MergeEdge {}
76
77impl PartialOrd for MergeEdge {
78    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
79        Some(self.cmp(other))
80    }
81}
82
83impl Ord for MergeEdge {
84    fn cmp(&self, other: &Self) -> Ordering {
85        // Min-heap: reverse ordering (smaller timestamp first)
86        match other.timestamp_us.cmp(&self.timestamp_us) {
87            Ordering::Equal => other.edge_id.cmp(&self.edge_id),
88            ord => ord,
89        }
90    }
91}
92
93/// Entry in the merge heap
94struct HeapEntry {
95    edge: MergeEdge,
96    source_idx: usize,
97}
98
99impl PartialEq for HeapEntry {
100    fn eq(&self, other: &Self) -> bool {
101        self.edge == other.edge
102    }
103}
104
105impl Eq for HeapEntry {}
106
107impl PartialOrd for HeapEntry {
108    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
109        Some(self.cmp(other))
110    }
111}
112
113impl Ord for HeapEntry {
114    fn cmp(&self, other: &Self) -> Ordering {
115        self.edge.cmp(&other.edge)
116    }
117}
118
119/// Configuration for parallel merge
120#[derive(Debug, Clone)]
121pub struct ParallelMergeConfig {
122    /// Number of reader threads
123    pub reader_threads: usize,
124    /// Number of merger threads (for range partitioning)
125    pub merger_threads: usize,
126    /// Channel buffer size per input
127    pub channel_buffer_size: usize,
128    /// Batch size for reading
129    pub read_batch_size: usize,
130}
131
132impl Default for ParallelMergeConfig {
133    fn default() -> Self {
134        let num_cpus = num_cpus::get();
135        Self {
136            reader_threads: (num_cpus / 2).max(1),
137            merger_threads: (num_cpus / 2).max(1),
138            channel_buffer_size: 1024,
139            read_batch_size: 256,
140        }
141    }
142}
143
144impl ParallelMergeConfig {
145    /// Create config optimized for the given number of input files
146    pub fn for_inputs(num_inputs: usize) -> Self {
147        let num_cpus = num_cpus::get();
148        Self {
149            reader_threads: num_inputs.min(num_cpus),
150            merger_threads: (num_cpus / 2).max(1),
151            channel_buffer_size: 1024,
152            read_batch_size: 256,
153        }
154    }
155}
156
157/// Statistics for parallel merge operations
158#[derive(Debug, Default)]
159pub struct ParallelMergeStats {
160    /// Total edges read from inputs
161    pub edges_read: AtomicU64,
162    /// Total edges written to output
163    pub edges_written: AtomicU64,
164    /// Tombstones filtered out
165    pub tombstones_filtered: AtomicU64,
166    /// Duplicate edges merged
167    pub duplicates_merged: AtomicU64,
168}
169
170impl ParallelMergeStats {
171    /// Create new stats
172    pub fn new() -> Arc<Self> {
173        Arc::new(Self::default())
174    }
175
176    /// Record edges read
177    pub fn record_read(&self, count: u64) {
178        self.edges_read.fetch_add(count, AtomicOrdering::Relaxed);
179    }
180
181    /// Record edges written
182    pub fn record_written(&self, count: u64) {
183        self.edges_written.fetch_add(count, AtomicOrdering::Relaxed);
184    }
185
186    /// Record filtered tombstones
187    pub fn record_tombstone(&self) {
188        self.tombstones_filtered
189            .fetch_add(1, AtomicOrdering::Relaxed);
190    }
191
192    /// Record merged duplicates
193    pub fn record_duplicate(&self) {
194        self.duplicates_merged.fetch_add(1, AtomicOrdering::Relaxed);
195    }
196
197    /// Get snapshot of stats
198    pub fn snapshot(&self) -> ParallelMergeStatsSnapshot {
199        ParallelMergeStatsSnapshot {
200            edges_read: self.edges_read.load(AtomicOrdering::Relaxed),
201            edges_written: self.edges_written.load(AtomicOrdering::Relaxed),
202            tombstones_filtered: self.tombstones_filtered.load(AtomicOrdering::Relaxed),
203            duplicates_merged: self.duplicates_merged.load(AtomicOrdering::Relaxed),
204        }
205    }
206}
207
208/// Snapshot of merge stats
209#[derive(Debug, Clone)]
210pub struct ParallelMergeStatsSnapshot {
211    pub edges_read: u64,
212    pub edges_written: u64,
213    pub tombstones_filtered: u64,
214    pub duplicates_merged: u64,
215}
216
217// =============================================================================
218// Task 7 Enhancement: I/O Throttling for Compaction
219// =============================================================================
220
221/// Token bucket-based I/O throttler for compaction
222///
223/// ## Purpose
224/// Prevents compaction from saturating I/O bandwidth and causing
225/// latency spikes for foreground queries.
226///
227/// ## Algorithm
228/// Uses token bucket with configurable:
229/// - Bucket capacity (burst allowance)
230/// - Refill rate (sustained IOPS/bandwidth limit)
231/// - Adaptive scaling based on system load
232///
233/// ## Reference
234/// Based on RocksDB's `rate_limiter` implementation which uses
235/// token bucket for I/O bandwidth control.
236#[derive(Debug)]
237pub struct IoThrottler {
238    /// Available tokens (bytes or IOPS)
239    tokens: AtomicU64,
240    /// Maximum bucket capacity
241    capacity: u64,
242    /// Refill rate per second
243    refill_rate: u64,
244    /// Last refill timestamp (microseconds)
245    last_refill: AtomicU64,
246    /// Total bytes throttled (for stats)
247    total_throttled: AtomicU64,
248    /// Total wait time (microseconds)
249    total_wait_us: AtomicU64,
250    /// Whether throttling is enabled
251    enabled: std::sync::atomic::AtomicBool,
252}
253
254impl IoThrottler {
255    /// Create a new I/O throttler
256    ///
257    /// # Arguments
258    /// * `rate_bytes_per_sec` - Sustained I/O rate limit
259    /// * `burst_bytes` - Maximum burst allowance
260    pub fn new(rate_bytes_per_sec: u64, burst_bytes: u64) -> Self {
261        Self {
262            tokens: AtomicU64::new(burst_bytes),
263            capacity: burst_bytes,
264            refill_rate: rate_bytes_per_sec,
265            last_refill: AtomicU64::new(Self::now_us()),
266            total_throttled: AtomicU64::new(0),
267            total_wait_us: AtomicU64::new(0),
268            enabled: std::sync::atomic::AtomicBool::new(true),
269        }
270    }
271
272    /// Create throttler with sensible defaults for compaction
273    ///
274    /// Defaults to 100 MB/s sustained, 10 MB burst
275    pub fn for_compaction() -> Self {
276        Self::new(100 * 1024 * 1024, 10 * 1024 * 1024)
277    }
278
279    /// Create throttler that doesn't limit (for testing)
280    pub fn unlimited() -> Self {
281        let throttler = Self::new(u64::MAX, u64::MAX);
282        throttler.enabled.store(false, AtomicOrdering::Relaxed);
283        throttler
284    }
285
286    fn now_us() -> u64 {
287        std::time::SystemTime::now()
288            .duration_since(std::time::UNIX_EPOCH)
289            .unwrap()
290            .as_micros() as u64
291    }
292
293    /// Refill tokens based on elapsed time
294    fn refill(&self) {
295        let now = Self::now_us();
296        let last = self.last_refill.swap(now, AtomicOrdering::AcqRel);
297        let elapsed_us = now.saturating_sub(last);
298
299        if elapsed_us > 0 {
300            // tokens_to_add = elapsed_seconds * rate
301            // = elapsed_us / 1_000_000 * rate
302            let tokens_to_add = (elapsed_us as u128 * self.refill_rate as u128 / 1_000_000) as u64;
303
304            if tokens_to_add > 0 {
305                let current = self.tokens.load(AtomicOrdering::Relaxed);
306                let new_tokens = current.saturating_add(tokens_to_add).min(self.capacity);
307                self.tokens.store(new_tokens, AtomicOrdering::Release);
308            }
309        }
310    }
311
312    /// Request I/O tokens, blocking if necessary
313    ///
314    /// Returns the actual wait time in microseconds
315    pub fn acquire(&self, bytes: u64) -> u64 {
316        if !self.enabled.load(AtomicOrdering::Relaxed) {
317            return 0;
318        }
319
320        let mut total_wait = 0u64;
321
322        loop {
323            self.refill();
324
325            let current = self.tokens.load(AtomicOrdering::Acquire);
326
327            if current >= bytes {
328                // Try to consume tokens
329                match self.tokens.compare_exchange_weak(
330                    current,
331                    current - bytes,
332                    AtomicOrdering::AcqRel,
333                    AtomicOrdering::Acquire,
334                ) {
335                    Ok(_) => {
336                        if total_wait > 0 {
337                            self.total_wait_us
338                                .fetch_add(total_wait, AtomicOrdering::Relaxed);
339                            self.total_throttled
340                                .fetch_add(bytes, AtomicOrdering::Relaxed);
341                        }
342                        return total_wait;
343                    }
344                    Err(_) => continue, // Retry
345                }
346            }
347
348            // Not enough tokens - wait for refill
349            // Calculate wait time based on deficit
350            let deficit = bytes.saturating_sub(current);
351            let wait_us = (deficit as u128 * 1_000_000 / self.refill_rate as u128) as u64;
352            let wait_us = wait_us.clamp(100, 100_000); // Between 100us and 100ms
353
354            std::thread::sleep(std::time::Duration::from_micros(wait_us));
355            total_wait += wait_us;
356        }
357    }
358
359    /// Try to acquire tokens without blocking
360    ///
361    /// Returns true if tokens were acquired, false otherwise
362    pub fn try_acquire(&self, bytes: u64) -> bool {
363        if !self.enabled.load(AtomicOrdering::Relaxed) {
364            return true;
365        }
366
367        self.refill();
368
369        let current = self.tokens.load(AtomicOrdering::Acquire);
370        if current >= bytes {
371            self.tokens
372                .compare_exchange_weak(
373                    current,
374                    current - bytes,
375                    AtomicOrdering::AcqRel,
376                    AtomicOrdering::Acquire,
377                )
378                .is_ok()
379        } else {
380            false
381        }
382    }
383
384    /// Enable or disable throttling
385    pub fn set_enabled(&self, enabled: bool) {
386        self.enabled.store(enabled, AtomicOrdering::Release);
387    }
388
389    /// Check if throttling is enabled
390    pub fn is_enabled(&self) -> bool {
391        self.enabled.load(AtomicOrdering::Relaxed)
392    }
393
394    /// Update rate limit (for adaptive control)
395    pub fn set_rate(&mut self, rate_bytes_per_sec: u64) {
396        self.refill_rate = rate_bytes_per_sec;
397    }
398
399    /// Get current available tokens
400    pub fn available_tokens(&self) -> u64 {
401        self.refill();
402        self.tokens.load(AtomicOrdering::Relaxed)
403    }
404
405    /// Get throttler statistics
406    pub fn stats(&self) -> IoThrottlerStats {
407        IoThrottlerStats {
408            total_throttled_bytes: self.total_throttled.load(AtomicOrdering::Relaxed),
409            total_wait_us: self.total_wait_us.load(AtomicOrdering::Relaxed),
410            available_tokens: self.available_tokens(),
411            rate_bytes_per_sec: self.refill_rate,
412            enabled: self.is_enabled(),
413        }
414    }
415}
416
417/// I/O throttler statistics
418#[derive(Debug, Clone)]
419pub struct IoThrottlerStats {
420    /// Total bytes that were throttled
421    pub total_throttled_bytes: u64,
422    /// Total time spent waiting (microseconds)
423    pub total_wait_us: u64,
424    /// Currently available tokens
425    pub available_tokens: u64,
426    /// Configured rate limit
427    pub rate_bytes_per_sec: u64,
428    /// Whether throttling is enabled
429    pub enabled: bool,
430}
431
432/// Adaptive I/O controller that adjusts throttling based on system load
433///
434/// Monitors foreground query latency and adjusts compaction I/O
435/// rate to maintain acceptable performance.
436#[derive(Debug)]
437pub struct AdaptiveIoController {
438    /// Base throttler
439    throttler: IoThrottler,
440    /// Target p99 latency for foreground queries (microseconds)
441    target_latency_us: u64,
442    /// Minimum I/O rate (never throttle below this)
443    min_rate: u64,
444    /// Maximum I/O rate (upper bound)
445    max_rate: u64,
446    /// Current rate multiplier (0.1 to 1.0)
447    rate_multiplier: std::sync::atomic::AtomicU64,
448}
449
450impl AdaptiveIoController {
451    /// Create new adaptive controller
452    pub fn new(base_rate: u64, target_latency_us: u64) -> Self {
453        Self {
454            throttler: IoThrottler::new(base_rate, base_rate / 10),
455            target_latency_us,
456            min_rate: base_rate / 10,
457            max_rate: base_rate,
458            rate_multiplier: std::sync::atomic::AtomicU64::new(1000), // 1.0 as fixed point
459        }
460    }
461
462    /// Report observed foreground latency
463    ///
464    /// Used to adjust compaction I/O rate
465    pub fn report_latency(&self, latency_us: u64) {
466        let current_mult = self.rate_multiplier.load(AtomicOrdering::Relaxed);
467
468        let new_mult = if latency_us > self.target_latency_us * 2 {
469            // Latency too high - reduce compaction I/O significantly
470            (current_mult * 8 / 10).max(100) // 0.8x, min 0.1
471        } else if latency_us > self.target_latency_us {
472            // Latency slightly high - reduce slightly
473            (current_mult * 95 / 100).max(100) // 0.95x
474        } else if latency_us < self.target_latency_us / 2 && current_mult < 1000 {
475            // Latency low - can increase compaction I/O
476            (current_mult * 105 / 100).min(1000) // 1.05x, max 1.0
477        } else {
478            current_mult
479        };
480
481        self.rate_multiplier
482            .store(new_mult, AtomicOrdering::Relaxed);
483    }
484
485    /// Get current effective rate
486    pub fn effective_rate(&self) -> u64 {
487        let mult = self.rate_multiplier.load(AtomicOrdering::Relaxed);
488        let rate = (self.max_rate as u128 * mult as u128 / 1000) as u64;
489        rate.max(self.min_rate).min(self.max_rate)
490    }
491
492    /// Acquire I/O tokens with adaptive rate
493    pub fn acquire(&self, bytes: u64) -> u64 {
494        self.throttler.acquire(bytes)
495    }
496
497    /// Get the underlying throttler
498    pub fn throttler(&self) -> &IoThrottler {
499        &self.throttler
500    }
501}
502
503/// A producer that reads edges from an SSTable and sends them to a channel
504pub struct ParallelReader {
505    /// Channel sender for edges
506    sender: Sender<MergeEdge>,
507    /// Source index
508    source_idx: usize,
509    /// Stats
510    stats: Arc<ParallelMergeStats>,
511}
512
513impl ParallelReader {
514    /// Create a new parallel reader
515    pub fn new(
516        sender: Sender<MergeEdge>,
517        source_idx: usize,
518        stats: Arc<ParallelMergeStats>,
519    ) -> Self {
520        Self {
521            sender,
522            source_idx,
523            stats,
524        }
525    }
526
527    /// Send an edge to the merge channel
528    #[allow(clippy::result_large_err)]
529    pub fn send(&self, edge: MergeEdge) -> Result<(), crossbeam_channel::SendError<MergeEdge>> {
530        self.stats.record_read(1);
531        self.sender.send(edge)
532    }
533
534    /// Get the source index
535    pub fn source_idx(&self) -> usize {
536        self.source_idx
537    }
538}
539
540/// Multi-source merge coordinator
541pub struct ParallelMerger {
542    /// Receivers from each input source
543    receivers: Vec<Receiver<MergeEdge>>,
544    /// Configuration (reserved for future use)
545    #[allow(dead_code)]
546    config: ParallelMergeConfig,
547    /// Stats
548    stats: Arc<ParallelMergeStats>,
549}
550
551impl ParallelMerger {
552    /// Create channels for parallel reading
553    pub fn create_channels(
554        num_inputs: usize,
555        config: &ParallelMergeConfig,
556        stats: Arc<ParallelMergeStats>,
557    ) -> (Vec<ParallelReader>, Self) {
558        let mut senders = Vec::with_capacity(num_inputs);
559        let mut receivers = Vec::with_capacity(num_inputs);
560
561        for i in 0..num_inputs {
562            let (tx, rx) = bounded(config.channel_buffer_size);
563            senders.push(ParallelReader::new(tx, i, stats.clone()));
564            receivers.push(rx);
565        }
566
567        let merger = Self {
568            receivers,
569            config: config.clone(),
570            stats,
571        };
572
573        (senders, merger)
574    }
575
576    /// Perform the merge, collecting results into a vector
577    ///
578    /// This uses a binary heap for K-way merge with proper duplicate handling.
579    pub fn merge(self) -> Vec<MergeEdge> {
580        let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::new();
581        let mut result = Vec::new();
582        let mut last_key: Option<(u64, u128)> = None;
583
584        // Initialize heap with first element from each receiver
585        for (idx, rx) in self.receivers.iter().enumerate() {
586            if let Ok(edge) = rx.recv() {
587                heap.push(HeapEntry {
588                    edge,
589                    source_idx: idx,
590                });
591            }
592        }
593
594        while let Some(entry) = heap.pop() {
595            let edge = entry.edge;
596            let source_idx = entry.source_idx;
597
598            // Check for duplicates (same timestamp + edge_id)
599            let key = (edge.timestamp_us, edge.edge_id);
600            let is_duplicate = last_key.map(|k| k == key).unwrap_or(false);
601
602            if is_duplicate {
603                self.stats.record_duplicate();
604            } else if edge.is_tombstone {
605                // For now, we keep tombstones in output but track them
606                self.stats.record_tombstone();
607                result.push(edge.clone());
608                self.stats.record_written(1);
609            } else {
610                result.push(edge.clone());
611                self.stats.record_written(1);
612            }
613
614            last_key = Some(key);
615
616            // Get next element from the same source
617            if let Ok(next_edge) = self.receivers[source_idx].recv() {
618                heap.push(HeapEntry {
619                    edge: next_edge,
620                    source_idx,
621                });
622            }
623        }
624
625        result
626    }
627
628    /// Perform merge with a callback for each output edge
629    pub fn merge_with_callback<F>(self, mut callback: F)
630    where
631        F: FnMut(MergeEdge),
632    {
633        let mut heap: BinaryHeap<HeapEntry> = BinaryHeap::new();
634        let mut last_key: Option<(u64, u128)> = None;
635
636        // Initialize heap
637        for (idx, rx) in self.receivers.iter().enumerate() {
638            if let Ok(edge) = rx.recv() {
639                heap.push(HeapEntry {
640                    edge,
641                    source_idx: idx,
642                });
643            }
644        }
645
646        while let Some(entry) = heap.pop() {
647            let edge = entry.edge;
648            let source_idx = entry.source_idx;
649
650            let key = (edge.timestamp_us, edge.edge_id);
651            let is_duplicate = last_key.map(|k| k == key).unwrap_or(false);
652
653            if is_duplicate {
654                self.stats.record_duplicate();
655            } else {
656                if edge.is_tombstone {
657                    self.stats.record_tombstone();
658                }
659                callback(edge.clone());
660                self.stats.record_written(1);
661            }
662
663            last_key = Some(key);
664
665            // Get next from same source
666            if let Ok(next_edge) = self.receivers[source_idx].recv() {
667                heap.push(HeapEntry {
668                    edge: next_edge,
669                    source_idx,
670                });
671            }
672        }
673    }
674}
675
676/// Range partition for sub-compaction
677#[derive(Debug, Clone)]
678pub struct KeyRange {
679    /// Minimum timestamp (inclusive)
680    pub min_ts: u64,
681    /// Maximum timestamp (exclusive)
682    pub max_ts: u64,
683}
684
685impl KeyRange {
686    /// Create a new key range
687    pub fn new(min_ts: u64, max_ts: u64) -> Self {
688        Self { min_ts, max_ts }
689    }
690
691    /// Check if a timestamp falls within this range
692    pub fn contains(&self, ts: u64) -> bool {
693        ts >= self.min_ts && ts < self.max_ts
694    }
695}
696
697/// Partition key space for parallel sub-compaction
698pub fn partition_key_space(min_ts: u64, max_ts: u64, num_partitions: usize) -> Vec<KeyRange> {
699    if num_partitions == 0 || max_ts <= min_ts {
700        return vec![KeyRange::new(min_ts, max_ts)];
701    }
702
703    let range = max_ts - min_ts;
704    let partition_size = range / num_partitions as u64;
705
706    (0..num_partitions)
707        .map(|i| {
708            let start = min_ts + (i as u64 * partition_size);
709            let end = if i == num_partitions - 1 {
710                max_ts
711            } else {
712                min_ts + ((i as u64 + 1) * partition_size)
713            };
714            KeyRange::new(start, end)
715        })
716        .collect()
717}
718
719/// Builder for setting up a parallel merge operation
720pub struct ParallelMergeBuilder {
721    config: ParallelMergeConfig,
722    stats: Arc<ParallelMergeStats>,
723}
724
725impl ParallelMergeBuilder {
726    /// Create a new builder
727    pub fn new() -> Self {
728        Self {
729            config: ParallelMergeConfig::default(),
730            stats: ParallelMergeStats::new(),
731        }
732    }
733
734    /// Set configuration
735    pub fn config(mut self, config: ParallelMergeConfig) -> Self {
736        self.config = config;
737        self
738    }
739
740    /// Set stats tracker
741    pub fn stats(mut self, stats: Arc<ParallelMergeStats>) -> Self {
742        self.stats = stats;
743        self
744    }
745
746    /// Create channels for the given number of inputs
747    pub fn build(self, num_inputs: usize) -> (Vec<ParallelReader>, ParallelMerger) {
748        ParallelMerger::create_channels(num_inputs, &self.config, self.stats)
749    }
750}
751
752impl Default for ParallelMergeBuilder {
753    fn default() -> Self {
754        Self::new()
755    }
756}
757
758#[cfg(test)]
759mod tests {
760    use super::*;
761    use std::sync::atomic::AtomicUsize;
762
763    fn create_test_edge(edge_id: u128, timestamp_us: u64, is_tombstone: bool) -> MergeEdge {
764        MergeEdge {
765            edge_id,
766            timestamp_us,
767            is_tombstone,
768            data: [0u8; 128],
769            source_idx: 0,
770        }
771    }
772
773    #[test]
774    fn test_merge_edge_ordering() {
775        let e1 = create_test_edge(1, 1000, false);
776        let e2 = create_test_edge(2, 1000, false);
777        let e3 = create_test_edge(1, 2000, false);
778
779        // In min-heap, smaller timestamps should come first
780        assert!(e1 > e2); // Same timestamp, different edge_id
781        assert!(e1 > e3); // e1 has smaller timestamp, should be "greater" for min-heap
782    }
783
784    #[test]
785    fn test_parallel_merge_basic() {
786        let stats = ParallelMergeStats::new();
787        let config = ParallelMergeConfig {
788            reader_threads: 2,
789            merger_threads: 1,
790            channel_buffer_size: 100,
791            read_batch_size: 10,
792        };
793
794        let (readers, merger) = ParallelMerger::create_channels(3, &config, stats.clone());
795
796        // Spawn reader threads
797        let handles: Vec<_> = readers
798            .into_iter()
799            .enumerate()
800            .map(|(idx, reader)| {
801                thread::spawn(move || {
802                    for i in 0..10u64 {
803                        let edge = MergeEdge {
804                            edge_id: (idx * 100 + i as usize) as u128,
805                            timestamp_us: i * 100 + idx as u64, // Interleaved timestamps
806                            is_tombstone: false,
807                            data: [0u8; 128],
808                            source_idx: idx,
809                        };
810                        reader.send(edge).unwrap();
811                    }
812                })
813            })
814            .collect();
815
816        // Wait for readers
817        for h in handles {
818            h.join().unwrap();
819        }
820
821        // Perform merge
822        let result = merger.merge();
823
824        // Should have 30 edges (10 from each of 3 sources)
825        assert_eq!(result.len(), 30);
826
827        // Verify sorted order
828        for i in 1..result.len() {
829            let prev = &result[i - 1];
830            let curr = &result[i];
831            assert!(
832                prev.timestamp_us < curr.timestamp_us
833                    || (prev.timestamp_us == curr.timestamp_us && prev.edge_id < curr.edge_id),
834                "Results should be sorted"
835            );
836        }
837
838        // Check stats
839        let snapshot = stats.snapshot();
840        assert_eq!(snapshot.edges_read, 30);
841        assert_eq!(snapshot.edges_written, 30);
842    }
843
844    #[test]
845    fn test_parallel_merge_duplicates() {
846        let stats = ParallelMergeStats::new();
847        let config = ParallelMergeConfig::default();
848        let (readers, merger) = ParallelMerger::create_channels(2, &config, stats.clone());
849
850        // Both sources have the same edge (duplicate)
851        let handles: Vec<_> = readers
852            .into_iter()
853            .map(|reader| {
854                thread::spawn(move || {
855                    let edge = create_test_edge(42, 1000, false);
856                    reader.send(edge).unwrap();
857                })
858            })
859            .collect();
860
861        for h in handles {
862            h.join().unwrap();
863        }
864
865        let result = merger.merge();
866
867        // Should deduplicate
868        assert_eq!(result.len(), 1);
869
870        let snapshot = stats.snapshot();
871        assert_eq!(snapshot.edges_read, 2);
872        assert_eq!(snapshot.duplicates_merged, 1);
873    }
874
875    #[test]
876    fn test_partition_key_space() {
877        let partitions = partition_key_space(0, 1000, 4);
878
879        assert_eq!(partitions.len(), 4);
880        assert_eq!(partitions[0].min_ts, 0);
881        assert_eq!(partitions[0].max_ts, 250);
882        assert_eq!(partitions[1].min_ts, 250);
883        assert_eq!(partitions[1].max_ts, 500);
884        assert_eq!(partitions[2].min_ts, 500);
885        assert_eq!(partitions[2].max_ts, 750);
886        assert_eq!(partitions[3].min_ts, 750);
887        assert_eq!(partitions[3].max_ts, 1000);
888    }
889
890    #[test]
891    fn test_key_range_contains() {
892        let range = KeyRange::new(100, 200);
893
894        assert!(!range.contains(99));
895        assert!(range.contains(100));
896        assert!(range.contains(150));
897        assert!(!range.contains(200));
898    }
899
900    #[test]
901    fn test_merge_with_callback() {
902        let stats = ParallelMergeStats::new();
903        let config = ParallelMergeConfig::default();
904        let (readers, merger) = ParallelMerger::create_channels(2, &config, stats.clone());
905
906        let handles: Vec<_> = readers
907            .into_iter()
908            .enumerate()
909            .map(|(idx, reader)| {
910                thread::spawn(move || {
911                    for i in 0..5u64 {
912                        let edge =
913                            create_test_edge((idx * 10 + i as usize) as u128, i * 100, false);
914                        reader.send(edge).unwrap();
915                    }
916                })
917            })
918            .collect();
919
920        for h in handles {
921            h.join().unwrap();
922        }
923
924        let count = Arc::new(AtomicUsize::new(0));
925        let count_clone = count.clone();
926
927        merger.merge_with_callback(move |_edge| {
928            count_clone.fetch_add(1, AtomicOrdering::Relaxed);
929        });
930
931        assert_eq!(count.load(AtomicOrdering::Relaxed), 10);
932    }
933
934    #[test]
935    fn test_parallel_merge_config() {
936        let config = ParallelMergeConfig::for_inputs(8);
937        assert!(config.reader_threads >= 1);
938        assert!(config.merger_threads >= 1);
939
940        let default = ParallelMergeConfig::default();
941        assert!(default.channel_buffer_size > 0);
942        assert!(default.read_batch_size > 0);
943    }
944
945    // I/O Throttler Tests
946
947    #[test]
948    fn test_throttler_basic() {
949        let throttler = IoThrottler::new(1_000_000, 100_000); // 1 MB/s, 100KB burst
950
951        // Should be able to acquire up to burst size immediately
952        assert!(throttler.try_acquire(50_000));
953        assert!(throttler.try_acquire(50_000));
954
955        // Burst exhausted, should fail
956        assert!(!throttler.try_acquire(50_000));
957    }
958
959    #[test]
960    fn test_throttler_unlimited() {
961        let throttler = IoThrottler::unlimited();
962
963        // Should always succeed
964        assert!(throttler.try_acquire(1_000_000_000));
965        assert!(!throttler.is_enabled());
966    }
967
968    #[test]
969    fn test_throttler_blocking_acquire() {
970        // Very low rate to ensure blocking
971        let throttler = IoThrottler::new(1_000, 100); // 1 KB/s, 100 byte burst
972
973        // Exhaust burst
974        assert!(throttler.try_acquire(100));
975
976        // This should block briefly
977        let start = std::time::Instant::now();
978        let _wait = throttler.acquire(100);
979        let elapsed = start.elapsed();
980
981        // Should have waited at least a little (100 bytes at 1KB/s = 100ms)
982        // But we're lenient since timing varies
983        assert!(elapsed.as_millis() >= 10 || throttler.stats().total_wait_us > 0);
984    }
985
986    #[test]
987    fn test_throttler_stats() {
988        let throttler = IoThrottler::new(10_000_000, 10_000);
989
990        // Get initial tokens
991        let initial = throttler.stats().available_tokens;
992
993        throttler.try_acquire(5_000);
994        throttler.try_acquire(3_000);
995
996        let stats = throttler.stats();
997        // Should have consumed 8000 tokens (may have refilled slightly)
998        assert!(stats.available_tokens < initial);
999        assert_eq!(stats.rate_bytes_per_sec, 10_000_000);
1000        assert!(stats.enabled);
1001    }
1002
1003    #[test]
1004    fn test_adaptive_controller() {
1005        let controller = AdaptiveIoController::new(100_000_000, 10_000); // 100 MB/s, 10ms target
1006
1007        // Report high latency
1008        controller.report_latency(50_000); // 50ms - way too high
1009        let rate1 = controller.effective_rate();
1010
1011        controller.report_latency(50_000);
1012        let rate2 = controller.effective_rate();
1013
1014        // Rate should decrease
1015        assert!(rate2 <= rate1);
1016
1017        // Report low latency - rate should recover
1018        for _ in 0..20 {
1019            controller.report_latency(1_000); // 1ms - very low
1020        }
1021        let rate3 = controller.effective_rate();
1022
1023        // Should have recovered somewhat
1024        assert!(rate3 >= rate2);
1025    }
1026}