threads_pool 0.2.6

This package provides an easy way to create and manage thread pools, so you don't have to.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
#![allow(dead_code)]

//use std::future::Future;
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc, Weak,
};
use std::thread;
use std::time::{Duration, SystemTime};

use crate::debug::is_debug_mode;
use crate::manager::{IdleThreshold, StatusBehaviorDefinitions, StatusBehaviors};
use crate::model::*;
use crate::pool::PoolStatus;
use crossbeam_channel as channel;

const TIMEOUT: Duration = Duration::from_micros(16);
const LONG_TIMEOUT: Duration = Duration::from_micros(96);
const LOT_COUNTS: usize = 3;
const LONG_PARKING_ROUNDS: u8 = 8;
const SHORT_PARKING_ROUNDS: u8 = 2;

/*
struct FutWorker {
    local_pool: LocalPool,
    tasks_queue: LocalSpawner,
    tasks_counter: usize,
}

impl FutWorker {
    fn new() -> FutWorker {
        let local_pool = LocalPool::new();
        let tasks_queue = local_pool.spawner();

        FutWorker {
            local_pool,
            tasks_queue,
            tasks_counter: 0,
        }
    }

    fn push<F: Future<Output = ()> + Send + 'static>(&mut self, job: F) -> Result<(), String> {
        self.tasks_queue
            .spawn_local(job)
            .and_then(|_| {
                self.tasks_counter += 1;
                Ok(())
            })
            .map_err(|_| String::from("The futures pool has been shutdown ... "))
    }

    fn run(&mut self) -> bool {
        while self.tasks_counter > 0 && self.local_pool.try_run_one() {
            self.tasks_counter -= 1;
        }

        self.tasks_counter == 0
    }

    #[inline]
    fn is_empty(&self) -> bool {
        self.tasks_counter == 0
    }
}
*/

thread_local!();

pub(crate) struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
    stat: Weak<AtomicUsize>,
    before_drop: Option<WorkerUpdate>,
    after_drop: Option<WorkerUpdate>,
}

struct WorkStatus(i8, Option<Job>);

impl Worker {
    /// Create and spawn the worker, this will dispatch the worker to listen to work queue immediately
    pub(crate) fn new(
        name: Option<String>,
        my_id: usize,
        stack_size: usize,
        privileged: bool,
        rx_pair: (channel::Receiver<Message>, channel::Receiver<Message>),
        shared_info: (PoolStatus, IdleThreshold), // (idle_threshold, pool_status)
        behavior_definition: &StatusBehaviors,
    ) -> Worker {
        behavior_definition.before_start(my_id);

        let (worker, stat) =
            Self::spawn_worker(name, my_id, stack_size, privileged, rx_pair, shared_info);

        behavior_definition.after_start(my_id);

        Worker {
            id: my_id,
            thread: Some(worker),
            stat,
            before_drop: behavior_definition.before_drop_clone(),
            after_drop: behavior_definition.after_drop_clone(),
        }
    }

    /// Get the worker id
    pub(crate) fn get_id(&self) -> usize {
        self.id
    }

    /// Calling `retire` on a worker will block the thread until the worker has done its work, or wake
    /// up from hibernation. This could block the caller for an undetermined amount of time.
    pub(crate) fn retire(&mut self) {
        if let Some(handle) = self.thread.take() {
            if let Some(stat) = self.stat.upgrade() {
                stat.store(1, Ordering::SeqCst);
            }

            // make sure we can wake up and quit
            handle.thread().unpark();

            // make sure the work is done
            handle.join().unwrap_or_else(|err| {
                eprintln!("Unable to drop worker: {}, error: {:?}", self.id, err);
            });
        }
    }

    /// If the worker has been put to sleep (i.e. in `park` mode), wake it up. This API will not check
    /// if the worker is actually hibernating or not.
    pub(crate) fn wake_up(&self) {
        if let Some(handle) = self.thread.as_ref() {
            handle.thread().unpark();
        }
    }

    /// Check if the worker has quit its inner loop and ready to be joined
    pub(crate) fn is_terminated(&self) -> bool {
        if let Some(stat) = self.stat.upgrade() {
            return stat.load(Ordering::Acquire) == 2usize;
        }

        false
    }

    fn spawn_worker(
        name: Option<String>,
        my_id: usize,
        stack_size: usize,
        privileged: bool,
        rx_pair: (channel::Receiver<Message>, channel::Receiver<Message>),
        shared_info: (PoolStatus, IdleThreshold),
    ) -> (thread::JoinHandle<()>, Weak<AtomicUsize>) {
        let mut builder = thread::Builder::new();

        if name.is_some() {
            builder = builder.name(name.unwrap_or_else(|| format!("worker-{}", my_id)));
        }

        if stack_size > 0 {
            builder = builder.stack_size(stack_size);
        }

        let worker_stat = Arc::new(AtomicUsize::new(0));
        let stat_clone = Arc::downgrade(&worker_stat);

        let handle = builder
            .spawn(move || {
                let mut idle_stat: Option<u8>;
                let mut status: u8;
                let mut pri_work_count: u8 = 0;

                let mut since = if privileged {
                    None
                } else {
                    Some(SystemTime::now())
                };

                // unpack the shared info triple
                let (pool_status, idle_threshold) = shared_info;
                let (pri_wait, norm_wait) = match my_id % LOT_COUNTS {
                    0 => (true, false),
                    1 => (false, true),
                    _ => (false, false),
                };

                // main worker loop
                loop {
                    // get ready to take new work from the channel
                    if worker_stat.load(Ordering::SeqCst) == 1usize {
                        return;
                    }

                    // get the pool status code
                    status = pool_status.load();
                    if status == FLAG_FORCE_CLOSE
                        || ((status == FLAG_CLOSING || status == FLAG_REST)
                            && rx_pair.0.is_empty()
                            && rx_pair.1.is_empty())
                    {
                        // if shutting down, check if we can abandon all work by checking forced
                        // close flag, or when all work have been processed.
                        worker_stat.store(1, Ordering::SeqCst);
                        return;
                    }

                    // wait for work loop
                    let work = match Worker::check_queues(
                        &rx_pair.0,
                        &rx_pair.1,
                        pri_wait,
                        norm_wait,
                        &mut pri_work_count,
                    ) {
                        // if the channels are disconnected, return
                        WorkStatus(-1, _) => {
                            worker_stat.store(1, Ordering::SeqCst);
                            return;
                        }
                        WorkStatus(_, job) => job,
                    };

                    // if there's a job, get it done first, and calc the idle period since last actual job
                    idle_stat =
                        // if we have work, do them now
                        Worker::handle_work(
                            work,
                            &mut since
                        )
                        .or_else(|| {
                            // if we don't have the work, calculate the idle period
                            Worker::calc_idle(&since)
                        })
                        .and_then(|idle| {
                            // if idled longer than the expected worker life for unprivileged workers,
                            // then we're done now -- self-purging.
                            let stat_code = idle_threshold.idle_stat(idle.as_secs() as u64);

                            if stat_code > 0 {
                                // mark self as a voluntary retiree
                                worker_stat.store(stat_code as usize, Ordering::SeqCst);
                                return Some(stat_code);
                            }

                            None
                        });

                    /*
                    // if not done and it's a target kill, handle it now
                    done = done
                        || target
                            .and_then(|id_slice| {
                                if id_slice.is_empty() {
                                    return None;
                                }

                                // write and done, keep the write lock scoped and update the graveyard
                                let mut found = false;
                                let mut g = graveyard.write();

                                for id in id_slice {
                                    // update graveyard for clean up purposes
                                    g.insert(id);

                                    // only receiving the universal kill-signal from closing the channel
                                    found = found || id == my_id;
                                }

                                // if my id or a forced kill, just quit
                                if found {
                                    return Some(());
                                }

                                None
                            })
                            .is_some();
                    */

                    match idle_stat {
                        Some(1) => {
                            pool_status.toggle_flag(FLAG_SLEEP_WORKERS, true);
                            thread::park();
                        },
                        Some(2) => return,
                        _ => {}
                    }

                    if status == FLAG_HIBERNATING {
                        thread::park();
                    }
                }
            })
            .unwrap();

        (handle, stat_clone)
    }

    fn check_queues(
        pri_chan: &channel::Receiver<Message>,
        norm_chan: &channel::Receiver<Message>,
        pri_wait: bool,
        norm_wait: bool,
        pri_work_count: &mut u8,
    ) -> WorkStatus {
        // wait for work loop, 1/3 of workers will long-park for priority work, and 1/3 of workers
        // will long-park for normal work, the remainder 1/3 workers will be fluid and constantly
        // query both queues -- whichever yield a task, then it will execute that task.
        if *pri_work_count < 255 {
            // 1/3 of the workers is designated to wait longer for prioritised jobs
            let norm_full = norm_chan.is_full();

            match Worker::fetch_work(pri_chan, norm_full && !pri_wait) {
                Ok(message) => {
                    // message is the only place that can update the "done" field
                    let (job, _) = Worker::unpack_message(message);

                    if *pri_work_count < 4 {
                        // only add if we're below the continuous pri-work cap
                        *pri_work_count += 1;
                    } else if norm_full {
                        // if we've done 4 or more priority work in a row, check if
                        // we should skip if the normal channel is full and maybe
                        // blocking, by setting the special number
                        *pri_work_count = 255;
                    }

                    return WorkStatus(0, job);
                }
                Err(channel::RecvTimeoutError::Disconnected) => {
                    // sender has been dropped
                    return WorkStatus(-1, None);
                }
                Err(channel::RecvTimeoutError::Timeout) => {
                    // if chan empty, do nothing and fall through to the normal chan handle
                    // fall-through
                }
            };
        } else {
            // if the worker has performed 4 consecutive prioritized work and the normal
            // channel is full, we skip the priority work once to pick up a normal work
            // such that it won't be blocked forever; meanwhile, reset the counter.
            *pri_work_count = 0;
        }

        // 1/3 of the workers is designated to wait longer for normal jobs
        match Worker::fetch_work(norm_chan, pri_chan.is_full() && !norm_wait) {
            Ok(message) => {
                // message is the only place that can update the "done" field
                let (job, _) = Worker::unpack_message(message);
                *pri_work_count = 0;

                return WorkStatus(0, job);
            }
            Err(channel::RecvTimeoutError::Disconnected) => {
                // sender has been dropped
                return WorkStatus(-1, None);
            }
            Err(channel::RecvTimeoutError::Timeout) => {
                // nothing to receive yet
            }
        };

        WorkStatus(0, None)
    }

    fn fetch_work(
        main_chan: &channel::Receiver<Message>,
        can_skip: bool,
    ) -> Result<Message, channel::RecvTimeoutError> {
        let mut wait = 0;
        let rounds = if can_skip {
            SHORT_PARKING_ROUNDS
        } else {
            LONG_PARKING_ROUNDS
        };

        loop {
            wait += 1;

            match main_chan.try_recv() {
                Ok(work) => return Ok(work),
                Err(channel::TryRecvError::Disconnected) => {
                    return Err(channel::RecvTimeoutError::Disconnected)
                }
                Err(channel::TryRecvError::Empty) => {
                    if can_skip {
                        // if there're normal work in queue, break to fetch the normal work
                        return Err(channel::RecvTimeoutError::Timeout);
                    }
                }
            }

            if wait > rounds {
                return Err(channel::RecvTimeoutError::Timeout);
            }
        }
    }

    fn handle_work(work: Option<Job>, since: &mut Option<SystemTime>) -> Option<Duration> {
        if let Some(w) = work {
            w.call_box();
        }

        let mut idle = None;
        if since.is_some() {
            idle = Worker::calc_idle(&since);
            since.replace(SystemTime::now());
        }

        idle
    }

    fn unpack_message(message: Message) -> (Option<Job>, Option<Vec<usize>>) {
        match message {
            Message::SingleJob(job) => (Some(job), None),
            Message::ChainedJobs(_) => unreachable!(),
            Message::Terminate(target) => (None, Some(target)),
        }
    }

    fn calc_idle(since: &Option<SystemTime>) -> Option<Duration> {
        if let Some(s) = since {
            if let Ok(e) = s.elapsed() {
                return Some(e);
            }
        }

        None
    }
}

impl Drop for Worker {
    fn drop(&mut self) {
        if let Some(behavior) = self.before_drop {
            behavior(self.id);
        }

        if is_debug_mode() {
            println!("Dropping worker {}", self.id);
        }

        self.retire();

        if let Some(behavior) = self.after_drop {
            behavior(self.id);
        }
    }
}