oxirs_arq/
parallel_executor_queue.rs1use crate::algebra::{Term as AlgebraTerm, TriplePattern};
8use crate::executor::Dataset;
9use anyhow::Result;
10use parking_lot::Mutex;
11use std::collections::VecDeque;
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::Arc;
14
15pub struct ParallelScanIterator<'a> {
17 dataset: &'a dyn Dataset,
18 pattern: TriplePattern,
19 #[allow(dead_code)]
20 chunk_size: usize,
21}
22
23impl<'a> ParallelScanIterator<'a> {
24 pub fn new(dataset: &'a dyn Dataset, pattern: TriplePattern, chunk_size: usize) -> Self {
25 Self {
26 dataset,
27 pattern,
28 chunk_size,
29 }
30 }
31
32 pub fn par_scan(&self) -> Result<Vec<(AlgebraTerm, AlgebraTerm, AlgebraTerm)>> {
34 self.dataset.find_triples(&self.pattern)
37 }
38}
39
40pub struct WorkStealingQueue<T: Send + Sync> {
42 queues: Vec<Arc<Mutex<VecDeque<T>>>>,
43 thread_count: usize,
44 work_counters: Vec<AtomicUsize>,
45 global_work_count: AtomicUsize,
46}
47
48impl<T: Send + Sync> WorkStealingQueue<T> {
49 pub fn new(thread_count: usize) -> Self {
50 let mut queues = Vec::with_capacity(thread_count);
51 let mut work_counters = Vec::with_capacity(thread_count);
52
53 for _ in 0..thread_count {
54 queues.push(Arc::new(Mutex::new(VecDeque::new())));
55 work_counters.push(AtomicUsize::new(0));
56 }
57
58 Self {
59 queues,
60 thread_count,
61 work_counters,
62 global_work_count: AtomicUsize::new(0),
63 }
64 }
65
66 pub fn push(&self, thread_id: usize, work: T) {
68 let queue_id = thread_id % self.thread_count;
69 if let Some(queue) = self.queues.get(queue_id) {
70 queue.lock().push_back(work);
71 self.work_counters[queue_id].fetch_add(1, Ordering::Relaxed);
72 self.global_work_count.fetch_add(1, Ordering::Relaxed);
73 }
74 }
75
76 pub fn push_balanced(&self, work: T) {
78 let mut min_load = usize::MAX;
79 let mut best_queue = 0;
80
81 for (i, counter) in self.work_counters.iter().enumerate() {
83 let load = counter.load(Ordering::Relaxed);
84 if load < min_load {
85 min_load = load;
86 best_queue = i;
87 }
88 }
89
90 self.push(best_queue, work);
91 }
92
93 pub fn steal(&self, thread_id: usize) -> Option<T> {
95 if let Some(queue) = self.queues.get(thread_id) {
97 if let Some(mut q) = queue.try_lock() {
98 if let Some(work) = q.pop_back() {
99 self.work_counters[thread_id].fetch_sub(1, Ordering::Relaxed);
100 self.global_work_count.fetch_sub(1, Ordering::Relaxed);
101 return Some(work);
102 }
103 }
104 }
105
106 let start = (thread_id + 1) % self.thread_count;
108 for i in 0..self.thread_count {
109 let target = (start + i) % self.thread_count;
110 if target != thread_id {
111 if let Some(queue) = self.queues.get(target) {
112 if let Some(mut q) = queue.try_lock() {
113 if let Some(work) = q.pop_front() {
114 self.work_counters[target].fetch_sub(1, Ordering::Relaxed);
115 self.global_work_count.fetch_sub(1, Ordering::Relaxed);
116 return Some(work);
117 }
118 }
119 }
120 }
121 }
122
123 None
124 }
125
126 pub fn pending_work(&self) -> usize {
128 self.global_work_count.load(Ordering::Relaxed)
129 }
130
131 pub fn is_empty(&self) -> bool {
133 self.pending_work() == 0
134 }
135
136 pub fn get_load_distribution(&self) -> Vec<usize> {
138 self.work_counters
139 .iter()
140 .map(|counter| counter.load(Ordering::Relaxed))
141 .collect()
142 }
143
144 pub fn drain_all(&self) -> Vec<T> {
146 let mut all_work = Vec::new();
147
148 for (i, queue) in self.queues.iter().enumerate() {
149 {
150 let mut q = queue.lock();
151 while let Some(work) = q.pop_front() {
152 all_work.push(work);
153 }
154 }
155 self.work_counters[i].store(0, Ordering::Relaxed);
156 }
157
158 self.global_work_count.store(0, Ordering::Relaxed);
159 all_work
160 }
161}