worker_pool/
crate_macros.rs

1// NOTE: It seems like I can't just `use X;` and have the macro re-export that
2
3/**
4Wrapper around `Receiver<DownMsg<T>>::recv`, meant to be used in a loop:
5
6- if `Stop` or `Err` is received, breaks from the parent loop
7- if `Pause` or `Continue` are received, do nothing
8- if `Other(x)` is received, returns `x`
9
10# Example
11
12```
13# use worker_pool::*;
14# let mut pool: WorkerPool<(), usize> = WorkerPool::new(100);
15pool.execute(|_tx, rx| {
16    loop {
17        let msg = worker_pool::recv_break!(rx);
18        // Do something with msg
19    }
20});
21# pool.broadcast(DownMsg::Other(10));
22# pool.stop_and_join();
23```
24*/
25#[macro_export]
26macro_rules! recv_break {
27    ( $rx:tt ) => {{
28        let res = loop {
29            match $rx.recv() {
30                Ok(worker_pool::DownMsg::Stop) => break None,
31                Ok(worker_pool::DownMsg::Pause) => {}
32                Ok(worker_pool::DownMsg::Continue) => {},
33                Ok(worker_pool::DownMsg::Other(x)) => break Some(x),
34                Err(_) => break None
35            }
36        };
37
38        match res {
39            Some(x) => x,
40            None => break
41        }
42    }}
43}
44/**
45Wrapper around `Receiver<DownMsg<T>>::try_recv`, meant to be used in a loop:
46
47- if `Stop` or `Disconnected` is received, breaks from the parent loop
48- if `Pause` is received, then block until `Continue` is received; any `Other` message will be ignored between the two
49- if `Continue` or `Empty` is received, returns `None`
50- if `Other(x)` is received, returns `Some(x)`
51
52# Example
53
54```
55# use worker_pool::*;
56# let mut pool: WorkerPool<(), usize> = WorkerPool::new(100);
57pool.execute(|_tx, rx| {
58    # let mut count = 0;
59    loop {
60        if let Some(msg) = worker_pool::try_recv_break!(rx) {
61            // Handle msg
62            # count = msg;
63        } else {
64            // Do something else in the meantime
65            # count += 1;
66        }
67    }
68});
69# pool.broadcast(DownMsg::Other(10));
70# pool.stop_and_join();
71```
72*/
73#[macro_export]
74macro_rules! try_recv_break {
75    ( $rx:tt ) => {{
76        match $rx.try_recv() {
77            Ok(worker_pool::DownMsg::Stop) => break,
78            Ok(worker_pool::DownMsg::Pause) => {
79                let break_loop = loop {
80                    match $rx.recv() {
81                        Ok(worker_pool::DownMsg::Stop) => break true,
82                        Ok(worker_pool::DownMsg::Pause) => {}
83                        Ok(worker_pool::DownMsg::Continue) => break false,
84                        Ok(worker_pool::DownMsg::Other(x)) => {},
85                        Err(_) => break true
86                    }
87                };
88
89                if break_loop {
90                    break
91                } else {
92                    None
93                }
94            }
95            Ok(worker_pool::DownMsg::Continue) => None,
96            Ok(worker_pool::DownMsg::Other(x)) => Some(x),
97            Err(std::sync::mpsc::TryRecvError::Disconnected) => break,
98            Err(std::sync::mpsc::TryRecvError::Empty) => None,
99        }
100    }}
101}