1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use std::sync::{Arc, Mutex};
use std::thread::{self, yield_now, JoinHandle};
use super::{Executable, TaskQueue, TaskQueueSet};

#[derive(Debug)]
pub struct Worker {
  is_running: Arc<Mutex<bool>>,
  inner_handle: JoinHandle<()>,
  task_queue: TaskQueue,
}

impl Worker {
  pub fn new(task_queue_set: TaskQueueSet) -> Self {
    let is_running = Arc::new(Mutex::new(true));
    let task_queue = task_queue_set.new_queue();
    let inner_handle = WorkerInner::init(is_running.clone(), task_queue_set, task_queue.clone());
    Self {
      is_running,
      task_queue,
      inner_handle,
    }
  }

  pub fn run<E>(&self, task: E)
  where
    E: Executable + 'static,
  {
    self.task_queue.insert(Box::new(task));
  }

  pub fn finish(self) {
    *self.is_running.lock().unwrap() = false;
    self.inner_handle.join().unwrap();
  }
}

pub struct WorkerInner {
  is_running: Arc<Mutex<bool>>,
  task_queue_set: TaskQueueSet,
  task_queue: TaskQueue,
}

impl WorkerInner {
  fn init(
    is_running: Arc<Mutex<bool>>,
    task_queue_set: TaskQueueSet,
    task_queue: TaskQueue,
  ) -> JoinHandle<()> {
    thread::spawn(move || {
      WorkerInner::new(is_running, task_queue_set, task_queue).run();
    })
  }

  fn new(
    is_running: Arc<Mutex<bool>>,
    task_queue_set: TaskQueueSet,
    task_queue: TaskQueue,
  ) -> Self {
    Self {
      is_running,
      task_queue_set,
      task_queue,
    }
  }

  fn run(&self) {
    loop {
      match self.task_queue.next() {
        Some(mut task) => loop {
          if task.exec() {
            break;
          }
        },
        None => if !self.try_steal() {
          if !*self.is_running.lock().unwrap() {
            break;
          }
          yield_now();
        },
      }
    }
  }

  fn try_steal(&self) -> bool {
    let mut tasks = self.task_queue_set.steal_from_rand_queue();
    if tasks.len() > 0 {
      self.task_queue.append(&mut tasks);
      true
    } else {
      false
    }
  }
}