Skip to main content

oxirs_arq/
parallel_executor_queue.rs

1//! Parallel scan iterator and work-stealing queue.
2//!
3//! [`ParallelScanIterator`] performs partitioned dataset scans, while
4//! [`WorkStealingQueue<T>`] provides dynamic load-balancing across worker
5//! threads for the parallel query executor.
6
7use crate::algebra::{Term as AlgebraTerm, TriplePattern};
8use crate::executor::Dataset;
9use anyhow::Result;
10use parking_lot::Mutex;
11use std::collections::VecDeque;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14
15/// Parallel scan iterator for large datasets
16pub struct ParallelScanIterator<'a> {
17    dataset: &'a dyn Dataset,
18    pattern: TriplePattern,
19    #[allow(dead_code)]
20    chunk_size: usize,
21}
22
23impl<'a> ParallelScanIterator<'a> {
24    pub fn new(dataset: &'a dyn Dataset, pattern: TriplePattern, chunk_size: usize) -> Self {
25        Self {
26            dataset,
27            pattern,
28            chunk_size,
29        }
30    }
31
32    /// Scan dataset in parallel chunks
33    pub fn par_scan(&self) -> Result<Vec<(AlgebraTerm, AlgebraTerm, AlgebraTerm)>> {
34        // In a real implementation, this would partition the dataset
35        // and scan chunks in parallel
36        self.dataset.find_triples(&self.pattern)
37    }
38}
39
40/// Work-stealing queue for dynamic load balancing
41pub struct WorkStealingQueue<T: Send + Sync> {
42    queues: Vec<Arc<Mutex<VecDeque<T>>>>,
43    thread_count: usize,
44    work_counters: Vec<AtomicUsize>,
45    global_work_count: AtomicUsize,
46}
47
48impl<T: Send + Sync> WorkStealingQueue<T> {
49    pub fn new(thread_count: usize) -> Self {
50        let mut queues = Vec::with_capacity(thread_count);
51        let mut work_counters = Vec::with_capacity(thread_count);
52
53        for _ in 0..thread_count {
54            queues.push(Arc::new(Mutex::new(VecDeque::new())));
55            work_counters.push(AtomicUsize::new(0));
56        }
57
58        Self {
59            queues,
60            thread_count,
61            work_counters,
62            global_work_count: AtomicUsize::new(0),
63        }
64    }
65
66    /// Push work to a specific thread's queue
67    pub fn push(&self, thread_id: usize, work: T) {
68        let queue_id = thread_id % self.thread_count;
69        if let Some(queue) = self.queues.get(queue_id) {
70            queue.lock().push_back(work);
71            self.work_counters[queue_id].fetch_add(1, Ordering::Relaxed);
72            self.global_work_count.fetch_add(1, Ordering::Relaxed);
73        }
74    }
75
76    /// Push work to the least loaded queue
77    pub fn push_balanced(&self, work: T) {
78        let mut min_load = usize::MAX;
79        let mut best_queue = 0;
80
81        // Find the queue with minimum load
82        for (i, counter) in self.work_counters.iter().enumerate() {
83            let load = counter.load(Ordering::Relaxed);
84            if load < min_load {
85                min_load = load;
86                best_queue = i;
87            }
88        }
89
90        self.push(best_queue, work);
91    }
92
93    /// Try to get work, stealing if necessary
94    pub fn steal(&self, thread_id: usize) -> Option<T> {
95        // Try own queue first (LIFO for cache locality)
96        if let Some(queue) = self.queues.get(thread_id) {
97            if let Some(mut q) = queue.try_lock() {
98                if let Some(work) = q.pop_back() {
99                    self.work_counters[thread_id].fetch_sub(1, Ordering::Relaxed);
100                    self.global_work_count.fetch_sub(1, Ordering::Relaxed);
101                    return Some(work);
102                }
103            }
104        }
105
106        // Try to steal from others (FIFO to avoid conflicts)
107        let start = (thread_id + 1) % self.thread_count;
108        for i in 0..self.thread_count {
109            let target = (start + i) % self.thread_count;
110            if target != thread_id {
111                if let Some(queue) = self.queues.get(target) {
112                    if let Some(mut q) = queue.try_lock() {
113                        if let Some(work) = q.pop_front() {
114                            self.work_counters[target].fetch_sub(1, Ordering::Relaxed);
115                            self.global_work_count.fetch_sub(1, Ordering::Relaxed);
116                            return Some(work);
117                        }
118                    }
119                }
120            }
121        }
122
123        None
124    }
125
126    /// Get total pending work count
127    pub fn pending_work(&self) -> usize {
128        self.global_work_count.load(Ordering::Relaxed)
129    }
130
131    /// Check if all queues are empty
132    pub fn is_empty(&self) -> bool {
133        self.pending_work() == 0
134    }
135
136    /// Get load distribution across threads
137    pub fn get_load_distribution(&self) -> Vec<usize> {
138        self.work_counters
139            .iter()
140            .map(|counter| counter.load(Ordering::Relaxed))
141            .collect()
142    }
143
144    /// Drain all work from all queues
145    pub fn drain_all(&self) -> Vec<T> {
146        let mut all_work = Vec::new();
147
148        for (i, queue) in self.queues.iter().enumerate() {
149            {
150                let mut q = queue.lock();
151                while let Some(work) = q.pop_front() {
152                    all_work.push(work);
153                }
154            }
155            self.work_counters[i].store(0, Ordering::Relaxed);
156        }
157
158        self.global_work_count.store(0, Ordering::Relaxed);
159        all_work
160    }
161}