1use std::marker::PhantomData;
2use std::ops::Drop;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5use std::thread;
6
7extern crate lace_ws_macros;
8pub use lace_ws_macros::lace_task;
9
10#[cfg(any(feature = "thread_spread", feature = "thread_nospread"))]
11extern crate hwloc2;
12#[cfg(any(feature = "thread_spread", feature = "thread_nospread"))]
13use hwloc2::{CpuBindFlags, ObjectType, Topology};
14
15#[cfg(feature = "numa_awareness")]
16extern crate hwloc2;
17#[cfg(feature = "numa_awareness")]
18use hwloc2::{CpuBindFlags, ObjectType, Topology};
19
20#[cfg(feature = "metrics")]
21mod metrics;
22#[cfg(feature = "metrics")]
23use metrics::Metrics;
24
25mod arena;
26use arena::picked::Arena;
27mod common;
28use common::{Function, THIEF_FINISHED, THIEF_NONE};
29mod prng;
30mod task;
31use task::picked::{Task, TypeErased as TypeErasedTask};
32
33#[cfg(not(feature = "chase_lev"))]
34mod lace_deque;
35#[cfg(not(feature = "chase_lev"))]
36use lace_deque::{Pop, Steal, Stealer, Worker as Deque};
37#[cfg(not(feature = "chase_lev"))]
38mod buffer;
39
40#[cfg(feature = "chase_lev")]
41extern crate crossbeam_deque;
42#[cfg(feature = "chase_lev")]
43use arena::picked::ArenaBox;
44#[cfg(feature = "chase_lev")]
45use crossbeam_deque::{Steal, Stealer, Worker as Deque};
46#[cfg(feature = "chase_lev")]
47use std::sync::atomic::AtomicUsize;
48
49#[allow(unused_macros)]
50macro_rules! wlog {
51 ($self:expr, $msg:expr) => {
52 log!("[WORKER {}]: {}", (($self).id), $msg);
53 };
54 ($self:expr, $fmt:expr, $($args:expr),*) => {
55 wlog!($self, format!($fmt, $($args),*));
56 }
57}
58
59#[cfg(not(feature = "chase_lev"))]
60type TaskDeque = Deque<TypeErasedTask>;
61#[cfg(not(feature = "chase_lev"))]
62type TaskStealer = Stealer<TypeErasedTask>;
63#[cfg(feature = "chase_lev")]
64type TaskDeque = Deque<(*mut TypeErasedTask, Arc<AtomicUsize>)>;
65#[cfg(feature = "chase_lev")]
66type TaskStealer = Stealer<(*mut TypeErasedTask, Arc<AtomicUsize>)>;
67
68pub struct Worker {
69 id: usize,
70 keep_going: Arc<AtomicBool>,
71 arena: Arena,
72 prng: prng::Lfsr,
73 queue: TaskDeque,
74 steal_handles: Vec<TaskStealer>,
75 #[cfg(feature = "chase_lev")]
76 recycle_thief: Vec<(Arc<AtomicUsize>, Arc<AtomicUsize>)>,
77 #[cfg(feature = "metrics")]
78 metrics: Arc<Metrics>,
79}
80impl Worker {
81 pub fn new(
82 id: usize,
83 keep_going: Arc<AtomicBool>,
84 #[cfg(feature = "metrics")] metrics: Arc<Metrics>,
85 ) -> (Self, TaskStealer) {
86 #[cfg(not(feature = "chase_lev"))]
87 let (queue, stealer) = lace_deque::deque(
88 common::BUFFER_SIZE,
89 #[cfg(feature = "metrics")]
90 metrics.clone(),
91 );
92 #[cfg(feature = "chase_lev")]
93 let queue = Deque::new_lifo();
94 #[cfg(feature = "chase_lev")]
95 let stealer = queue.stealer();
96
97 (
98 Self {
99 id,
100 arena: Arena::new(),
101 queue,
102 prng: prng::Lfsr::new(id),
103 steal_handles: Vec::new(),
104 keep_going,
105 #[cfg(feature = "chase_lev")]
106 recycle_thief: Vec::new(),
107 #[cfg(feature = "metrics")]
108 metrics: metrics.clone(),
109 },
110 stealer,
111 )
112 }
113
114 #[cfg(test)]
115 pub fn mock() -> Self {
116 Self::new(
117 0,
118 Arc::new(AtomicBool::from(true)),
119 #[cfg(feature = "metrics")]
120 Arc::new(Metrics::default()),
121 )
122 .0
123 }
124}
125
126#[must_use]
127pub struct SpawnToken<'task, I, O> {
128 #[cfg(feature = "chase_lev")]
129 task: *mut TypeErasedTask,
130 #[cfg(feature = "chase_lev")]
131 thief: Arc<AtomicUsize>,
132 _lt: PhantomData<&'task ()>,
133 _i: PhantomData<I>,
134 _o: PhantomData<O>,
135}
136
137#[cfg(not(feature = "chase_lev"))]
138impl Worker {
139 #[cold]
140 #[inline(never)]
141 fn sync_slow<I, O>(&mut self) -> O {
142 match self.queue.pop() {
143 Pop::Work(task) => {
144 let task: Task<I, O> = task.unerase();
145 task.execute(self)
146 }
147 Pop::Stolen(i) => {
148 let mut thief = THIEF_NONE;
149 while thief == THIEF_NONE {
151 thief = self.queue.thief_flag(i);
152 }
153 #[cfg(feature = "steal_backoff")]
154 let mut backoff: usize = 0;
155 #[cfg(feature = "leapfrog_random")]
156 let mut attempts: usize = 32; while thief != THIEF_FINISHED {
158 #[cfg(not(feature = "leapfrog_random"))]
159 self.leapfrog(thief);
160 #[cfg(feature = "leapfrog_random")]
161 if !self.leapfrog(thief) {
162 attempts -= 1;
163 if attempts == 0 {
164 attempts = 32;
165 self.steal_random();
166 }
167 }
168 #[cfg(feature = "steal_backoff")]
169 {
170 for _ in 0..backoff {
171 std::hint::spin_loop();
172 }
173 backoff = ((backoff + 1) * 2).min(1024);
174 }
175 thief = self.queue.thief_flag(i)
176 }
177 let task = self.queue.pop_stolen();
178 let mut task: Task<I, O> = task.unerase();
179 task.take_output(&mut self.arena)
180 }
181 }
182 }
183 #[inline(always)]
184 pub fn sync<I, O>(&mut self, _: SpawnToken<'_, I, O>) -> O {
185 if let Some(task) = self.queue.sync_fast() {
187 let task: Task<I, O> = task.unerase();
188 task.execute(self)
189 } else {
190 self.sync_slow::<I, O>()
191 }
192 }
193 #[inline(always)]
194 pub fn spawn<'task, I: 'task, O: 'task>(
195 &mut self,
196 task: Function<I, O>,
197 input: I,
198 ) -> SpawnToken<'task, I, O> {
199 #[cfg(feature = "metrics")]
200 self.metrics.tasks.fetch_add(1, Ordering::Relaxed);
201 self.queue
202 .push(Task::new(&mut self.arena, task, input).erase());
203 SpawnToken {
204 _lt: PhantomData,
205 _i: PhantomData,
206 _o: PhantomData,
207 }
208 }
209
210 #[inline(always)]
211 pub fn join<A, B, RA, RB>(
212 &mut self,
213 task_a: Function<A, RA>,
214 a: A,
215 task_b: Function<B, RB>,
216 b: B,
217 ) -> (RA, RB) {
218 let _ = self.spawn(task_a, a);
219 let result_b: RB = task_b(self, b);
222 let result_a: RA = if let Some(task) = self.queue.sync_fast() {
224 let mut task: Task<A, RA> = task.unerase();
225 let ipt = task.take_input(&mut self.arena);
226 task_a(self, ipt)
227 } else {
228 self.sync_slow::<A, RA>()
229 };
230 (result_a, result_b)
231 }
232
233 fn steal_work(&mut self, victim: usize) -> Steal<()> {
234 match self.steal_handles[victim].steal(self.id) {
235 #[cfg(not(feature = "unsafe_steal"))]
236 Steal::Success((i, mut task)) => {
237 #[cfg(feature = "metrics")]
238 self.metrics.steal_success.fetch_add(1, Ordering::Relaxed);
239 task.execute_stolen(self);
240 self.steal_handles[victim].steal_finished(i, task);
241 Steal::Success(())
242 }
243 #[cfg(feature = "unsafe_steal")]
244 Steal::Success((i, task)) => {
245 #[cfg(feature = "metrics")]
246 self.metrics.steal_success.fetch_add(1, Ordering::Relaxed);
247 unsafe { (*task).execute_stolen(self) };
248 self.steal_handles[victim].steal_finished(i);
249 Steal::Success(())
250 }
251 Steal::Retry => {
252 #[cfg(feature = "metrics")]
253 self.metrics.steal_busy.fetch_add(1, Ordering::Relaxed);
254 Steal::Retry
255 }
256 Steal::Empty => {
257 #[cfg(feature = "metrics")]
258 self.metrics.steal_empty.fetch_add(1, Ordering::Relaxed);
259 Steal::Empty
260 }
261 }
262 }
263}
264
265#[cfg(feature = "chase_lev")]
266unsafe impl Send for Worker {}
267#[cfg(feature = "chase_lev")]
268impl Worker {
269 #[cold]
270 fn new_thief_slot() -> (Arc<AtomicUsize>, Arc<AtomicUsize>) {
271 let a = Arc::new(AtomicUsize::new(THIEF_NONE));
272 (a.clone(), a)
273 }
274 #[inline(always)]
275 pub fn spawn<'task, I: 'task, O: 'task>(
276 &mut self,
277 task: Function<I, O>,
278 input: I,
279 ) -> SpawnToken<'task, I, O> {
280 let task = Task::new(&mut self.arena, task, input).erase();
281 let task_box = self.arena.alloc(task).into_mut_ptr();
282 let (thief_a, thief_b) = if let Some(x) = self.recycle_thief.pop() {
283 x
284 } else {
285 Self::new_thief_slot()
286 };
287 #[cfg(feature = "metrics")]
288 self.metrics.tasks.fetch_add(1, Ordering::Relaxed);
289 self.queue.push((task_box, thief_a));
290 SpawnToken {
291 task: task_box,
292 thief: thief_b,
293 _lt: PhantomData,
294 _i: PhantomData,
295 _o: PhantomData,
296 }
297 }
298 #[inline(always)]
299 pub fn sync<I, O>(&mut self, tkn: SpawnToken<'_, I, O>) -> O {
300 match self.queue.pop() {
301 Some((_ptr, thief_b)) => {
302 muteable_assert!(_ptr == tkn.task, {
303 eprintln!(
304 "Task Pointer Is Not What It Should Be (spawn/sync FIFO order violation)"
305 );
306 });
307 self.recycle_thief.push((tkn.thief, thief_b));
308 let task = self.arena.take(ArenaBox::from_mut_ptr(tkn.task));
310 let task: Task<I, O> = task.unerase();
311 task.execute(self)
312 }
313 None => {
314 let mut thief = THIEF_NONE;
316 while thief == THIEF_NONE {
317 thief = tkn.thief.load(Ordering::Relaxed)
318 }
319 #[cfg(feature = "steal_backoff")]
320 let mut backoff: usize = 0;
321 #[cfg(feature = "leapfrog_random")]
322 let mut attempts: usize = 32; while thief != THIEF_FINISHED {
324 #[cfg(not(feature = "leapfrog_random"))]
325 self.leapfrog(thief);
326 #[cfg(feature = "leapfrog_random")]
327 if !self.leapfrog(thief) {
328 attempts -= 1;
329 if attempts == 0 {
330 attempts = 32;
331 self.steal_random();
332 }
333 }
334 #[cfg(feature = "steal_backoff")]
335 {
336 for _ in 0..backoff {
337 std::hint::spin_loop();
338 }
339 backoff = ((backoff + 1) * 2).min(1024);
340 }
341 thief = tkn.thief.load(Ordering::Acquire);
342 }
343 tkn.thief.store(THIEF_NONE, Ordering::Relaxed);
344 let task = self.arena.take(ArenaBox::from_mut_ptr(tkn.task));
345 let mut task: Task<I, O> = task.unerase();
346 task.take_output(&mut self.arena)
347 }
348 }
349 }
350
351 #[inline(always)]
352 pub fn join<A, B, RA, RB>(
353 &mut self,
354 task_a: Function<A, RA>,
355 a: A,
356 task_b: Function<B, RB>,
357 b: B,
358 ) -> (RA, RB) {
359 let tkn = self.spawn(task_a, a);
360 let result_b: RB = task_b(self, b);
363 let result_a: RA = self.sync::<A, RA>(tkn);
365 (result_a, result_b)
366 }
367
368 fn steal_work(&mut self, victim: usize) -> Steal<()> {
369 match self.steal_handles[victim].steal() {
370 Steal::Success((task, thief_slot)) => {
371 #[cfg(feature = "metrics")]
372 self.metrics.steal_success.fetch_add(1, Ordering::Relaxed);
373 thief_slot.store(self.id, Ordering::Relaxed);
374 unsafe { (*task).execute_stolen(self) };
375 thief_slot.store(THIEF_FINISHED, Ordering::Release);
376 Steal::Success(())
377 }
378 Steal::Retry => {
379 #[cfg(feature = "metrics")]
380 self.metrics.steal_busy.fetch_add(1, Ordering::Relaxed);
381 Steal::Retry
382 }
383 Steal::Empty => {
384 #[cfg(feature = "metrics")]
385 self.metrics.steal_empty.fetch_add(1, Ordering::Relaxed);
386 Steal::Empty
387 }
388 }
389 }
390}
391
392impl Worker {
393 #[cfg(any(feature = "numa_awareness", feature = "thread_spread", feature = "thread_nospread"))]
394 fn numa_bind(&mut self, pu: usize) {
395 let mut topo = Topology::new().unwrap();
396 let pu_set = topo.objects_with_type(&ObjectType::PU).unwrap()[pu]
397 .cpuset()
398 .unwrap();
399 topo.set_cpubind(pu_set, CpuBindFlags::CPUBIND_THREAD)
400 .expect("Failed to bind thread to CPU");
401 }
402
403 #[inline(always)]
404 fn leapfrog(&mut self, thief: usize) -> bool {
405 match self.steal_work(thief) {
406 Steal::Success(_) => {
407 #[cfg(feature = "metrics")]
408 self.metrics.leap_success.fetch_add(1, Ordering::Relaxed);
409 true
410 }
411 Steal::Empty => {
412 #[cfg(feature = "metrics")]
413 self.metrics.leap_empty.fetch_add(1, Ordering::Relaxed);
414 false
415 }
416 Steal::Retry => {
417 #[cfg(feature = "metrics")]
418 self.metrics.leap_busy.fetch_add(1, Ordering::Relaxed);
419 true
420 }
421 }
422 }
423
424 #[inline(always)]
425 fn steal_random(&mut self) {
426 let n = self.steal_handles.len();
427 let mut victim = self.id;
429 while victim == self.id {
430 victim = self.prng.next(n);
431 }
432 let _ = self.steal_work(victim);
433 }
434
435 fn thread(mut self, #[cfg(any(feature = "numa_awareness", feature = "thread_spread", feature = "thread_nospread"))] pu: usize) -> thread::JoinHandle<()> {
436 thread::Builder::new()
437 .name("Lace Worker".to_string())
438 .stack_size(1024 * 1024 * 16)
439 .spawn(move || {
440 #[cfg(any(feature = "numa_awareness", feature = "thread_spread", feature = "thread_nospread"))]
441 self.numa_bind(pu);
442 while self.keep_going.load(Ordering::Relaxed) {
445 self.steal_random();
446 }
447 })
448 .expect("Failed to spawn Lace Worker Thread")
449 }
450}
451
452pub struct Lace {
453 root_worker: Worker,
454 handles: Vec<thread::JoinHandle<()>>,
455 keep_going: Arc<AtomicBool>,
456 #[cfg(feature = "metrics")]
457 worker_metrics: Vec<Arc<Metrics>>,
458}
459impl Lace {
460 pub fn init(n: usize) -> Self {
461 assert!(n != 0, "invalid number of workers");
462 let keep_going = Arc::from(AtomicBool::new(true));
463 let mut workers = Vec::new();
464 let mut steal_handles = Vec::new();
465 #[cfg(feature = "metrics")]
466 let mut worker_metrics = Vec::new();
467
468 for id in 0..n {
469 #[cfg(feature = "metrics")]
470 let metrics = Arc::new(Metrics::default());
471 let (w, s) = Worker::new(
472 id,
473 keep_going.clone(),
474 #[cfg(feature = "metrics")]
475 metrics.clone(),
476 );
477 steal_handles.push(s);
478 workers.push(w);
479 #[cfg(feature = "metrics")]
480 worker_metrics.push(metrics);
481 }
482 for w in workers.iter_mut() {
484 w.steal_handles = steal_handles.clone();
485 }
486 let mut out = Self {
488 root_worker: workers.remove(0),
489 handles: Vec::new(),
490 keep_going,
491 #[cfg(feature = "metrics")]
492 worker_metrics,
493 };
494 #[cfg(any(feature = "thread_spread", feature = "thread_nospread"))]
495 {
496 let topo = Topology::new().unwrap();
497 if !topo.support().cpu().set_current_thread() {
498 panic!("NUMA awareness enabled but thread binding not supported");
499 }
500 let mut allocated_pu = Vec::with_capacity(n);
501 if let Ok(sockets) = topo.objects_with_type(&ObjectType::Package) {
502 let mut socket_pus = Vec::new();
503 for (si, socket) in sockets.iter().enumerate() {
504 socket_pus.push(Vec::new());
505 let mut ci = 0;
506 for child in socket.children().iter() {
507 for gchild in child.children().iter() {
508 for ggchild in gchild.children().iter() {
509 for core in ggchild.children().iter() {
510 assert_eq!( core.object_type(), ObjectType::Core );
511 socket_pus[si].push(Vec::new());
512 for pu in core.children().iter() {
513 assert_eq!( pu.object_type(), ObjectType::PU );
514 socket_pus[si][ci].push(pu.logical_index());
515 }
516 ci += 1;
517 }
518 }
519 }
520 }
521 }
522 #[cfg(feature = "thread_spread")]
523 {
524 let mut ci = Vec::with_capacity(socket_pus.len());
525 for _ in 0..socket_pus.len() {
526 ci.push(0);
527 }
528 let mut si = 0;
529 while allocated_pu.len() < n {
530 allocated_pu.push(socket_pus[si][ci[si]][0] as usize);
531 ci[si] = (ci[si] + 1) % socket_pus[si].len();
532 si = (si + 1) % socket_pus.len();
533 }
534 }
535 #[cfg(feature = "thread_nospread")]
536 for i in 0..n {
537 allocated_pu.push(socket_pus[0][(i/2) % socket_pus[0].len()][i%2] as usize);
538 }
539 }
540 println!("allocated_pu: {allocated_pu:?}");
541 out.root_worker.numa_bind(allocated_pu[0]);
542 for (i, w) in workers.drain(..).enumerate() {
543 out.handles.push(w.thread(allocated_pu[i + 1]));
544 }
545 }
546 #[cfg(feature = "numa_awareness")]
547 {
548 let topo = Topology::new().unwrap();
549 if !topo.support().cpu().set_current_thread() {
550 panic!("NUMA awareness enabled but thread binding not supported");
551 }
552 let mut allocated_pu = Vec::with_capacity(n);
554 if let Ok(cores) = topo.objects_with_type(&ObjectType::Core) {
555 let mut core_pus = Vec::new();
557 for (ci, core) in cores.iter().enumerate() {
558 if let Some(c) = core.first_child() {
559 core_pus.push((ci, c));
560 }
561 }
562 core_pus.reverse();
566 while allocated_pu.len() < n {
569 for (ci, c) in core_pus.iter_mut() {
570 assert_eq!(c.object_type(), ObjectType::PU);
571 let pu = c.logical_index() as usize;
572 allocated_pu.push(pu);
573 if allocated_pu.len() == n {
574 break;
575 }
576 if let Some(sib) = c.next_sibling() {
578 *c = sib;
579 } else {
580 *c = cores[*ci].first_child().unwrap();
581 }
582 }
583 }
584 } else if let Ok(pus) = topo.objects_with_type(&ObjectType::PU) {
585 for id in 0..n {
587 allocated_pu.push(pus[id % pus.len()].logical_index() as usize);
588 }
589 } else {
590 panic!("NUMA awareness enabled but could not determine CPU or PU counts")
591 }
592 out.root_worker.numa_bind(allocated_pu[0]);
593 for (i, w) in workers.drain(..).enumerate() {
594 out.handles.push(w.thread(allocated_pu[i + 1]));
595 }
596 }
597 #[cfg(not(any(feature = "numa_awareness", feature = "thread_spread", feature = "thread_nospread")))]
598 for w in workers.drain(..) {
599 out.handles.push(w.thread());
600 }
601 out
602 }
603
604 #[inline(always)]
605 pub fn run<O>(&mut self, f: impl FnOnce(&mut Worker) -> O) -> O {
606 f(&mut self.root_worker)
607 }
608
609 pub fn stop(&mut self) {
610 self.keep_going.store(false, Ordering::Relaxed);
611 for handle in self.handles.drain(..) {
612 handle.join().expect("failed to stop worker thread");
613 }
614 }
615
616 #[cfg(feature = "metrics")]
617 #[allow(dead_code)] pub fn summarize(&self, normf: usize) {
619 let mut total = Metrics::default();
620 for (id, m) in self.worker_metrics.iter().enumerate() {
621 m.add_into(&mut total);
622 let m = m.normalized(normf);
623 println!("worker {id:2}: {:?}", m);
624 }
625 println!("total: {:?}", total.normalized(normf));
626 }
627}
628impl Drop for Lace {
630 fn drop(&mut self) {
631 self.stop()
632 }
633}
634
635#[macro_export]
636macro_rules! lace_run {
637 ($inst:expr, $($task:ident)::+($($args:expr),*)) => {
638 ($inst).run(|__lace_task_worker| $($task)::+(__lace_task_worker, ($($args),*)))
639 }
640}
641
642#[cfg(test)]
643mod tests {
644 use super::*;
645
646 #[lace_task]
647 fn smoke_mixed_args(_: f32, _: bool) -> usize {
648 321
649 }
650 #[lace_task]
651 fn smoke_joining_task(x: &mut usize) -> usize {
652 if *x < 15 {
653 *x += 1;
654 let (a, b) = join!(smoke_mixed_args(0.4, false), smoke_joining_task(x));
655 a.min(b)
656 } else {
657 123
658 }
659 }
660 #[test]
661 fn smoke() {
662 let mut lace = Lace::init(4);
663 let mut x = 11;
664 let ret = lace_run!(lace, smoke_joining_task(&mut x));
665 assert_eq!(ret, 123);
666 assert_eq!(x, 15);
667 }
668
669 #[lace_task]
670 fn dfs(n: usize) -> usize {
671 if n == 0 {
672 return 1;
673 }
674 let mut tokens = Vec::new();
675 for _ in 0..n {
676 tokens.push(spawn!(dfs(n - 1)));
677 }
678 let mut total = 0;
679 while let Some(tkn) = tokens.pop() {
680 total += sync!(tkn);
681 }
682 total
683 }
684 #[test]
685 fn largerun() {
686 let mut lace = Lace::init(4);
687 let x = lace_run!(lace, dfs(10));
688 assert_eq!(x, 10 * 9 * 8 * 7 * 6 * 5 * 4 * 3 * 2 * 1);
689 }
690 #[test]
691 fn manysmallruns() {
692 let mut lace = Lace::init(4);
693 const N: usize = 1000;
694 for _ in 0..N {
695 let x = lace_run!(lace, dfs(8));
696 assert_eq!(x, 8 * 7 * 6 * 5 * 4 * 3 * 2 * 1);
697 }
698 }
699}