1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
extern crate rand;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use crossbeam::sync::MsQueue;
use rand::Rng;
use errors::*;
use Record;
pub struct Recordset {
instances: AtomicUsize,
record_queue_count: AtomicUsize,
record_queue_size: AtomicUsize,
record_queue: MsQueue<Result<Record>>,
active: AtomicBool,
task_id: AtomicUsize,
}
impl Recordset {
#[doc(hidden)]
pub fn new(rec_queue_size: usize, nodes: usize) -> Self {
let mut rng = rand::thread_rng();
let task_id = rng.gen::<usize>();
Recordset {
instances: AtomicUsize::new(nodes),
record_queue_size: AtomicUsize::new(rec_queue_size),
record_queue_count: AtomicUsize::new(0),
record_queue: MsQueue::new(),
active: AtomicBool::new(true),
task_id: AtomicUsize::new(task_id),
}
}
pub fn close(&self) {
self.active.store(false, Ordering::Relaxed)
}
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Relaxed)
}
#[doc(hidden)]
pub fn push(&self, record: Result<Record>) -> Option<Result<Record>> {
if self.record_queue_count.fetch_add(1, Ordering::Relaxed) <
self.record_queue_size.load(Ordering::Relaxed) {
self.record_queue.push(record);
return None;
}
self.record_queue_count.fetch_sub(1, Ordering::Relaxed);
Some(record)
}
pub fn task_id(&self) -> u64 {
self.task_id.load(Ordering::Relaxed) as u64
}
#[doc(hidden)]
pub fn signal_end(&self) {
if self.instances.fetch_sub(1, Ordering::Relaxed) == 1 {
self.close()
};
}
}
impl<'a> Iterator for &'a Recordset {
type Item = Result<Record>;
fn next(&mut self) -> Option<Result<Record>> {
loop {
if self.is_active() || !self.record_queue.is_empty() {
let result = self.record_queue.try_pop();
if result.is_some() {
self.record_queue_count.fetch_sub(1, Ordering::Relaxed);
return result;
}
thread::yield_now();
continue;
} else {
return None;
}
}
}
}