1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use crate::hashmap::HashMap;
use crate::stack::Stack;
use crndm::default::*;
use crndm::sync::VWeak;
use regex::Regex;
type P = BuddyAlloc;
struct ConsumerData {
buf: PString,
local: HashMap<PString, u64>,
active: bool,
}
pub struct Consumer {
pattern: PString,
data: PMutex<ConsumerData>,
lines: Parc<PMutex<Stack<PString>>>,
words: Parc<PMutex<HashMap<PString, u64>>>,
}
impl Consumer {
pub fn new(
pattern: &str,
lines: Parc<PMutex<Stack<PString>>>,
words: Parc<PMutex<HashMap<PString, u64>>>,
j: &Journal,
) -> Self {
Self {
pattern: PString::from_str(pattern, j),
lines,
words,
data: PMutex::new(
ConsumerData {
buf: PString::new(j),
local: HashMap::new(j),
active: true,
},
j,
),
}
}
/// Starts processing `lines` and updating `words`
pub fn start(slf: VWeak<Consumer, P>) {
loop {
// Read from global buffer to the local buffer
if !P::transaction(|j| {
if let Some(slf) = slf.upgrade(j) {
let mut this = slf.data.lock(j);
if this.buf.is_empty() {
let mut lines = slf.lines.lock(j);
let line = lines.pop(j);
eprint!(
"\r\x1b[?25lRemaining: {:<12} Memory usage: {:<9} bytes \x1b[?25h",
lines.len(),
P::used()
);
if let Some(line) = line {
this.buf = line;
true // Still working
} else {
this.active
}
} else {
true
}
} else {
false
}
})
.unwrap()
{
return;
}
// counting words
P::transaction(|j| {
if let Some(slf) = slf.upgrade(j) {
let mut this = slf.data.lock(j);
if !this.buf.is_empty() {
let buf = this.buf.to_string();
let re = Regex::new(slf.pattern.as_str()).unwrap();
for cap in re.captures_iter(&buf) {
let w = cap.get(1).unwrap().as_str().to_pstring(j);
this.local.update_with(&w, j, |v| v + 1);
}
this.buf.clear();
}
}
})
.unwrap();
// Updating global `words` buffer with the local buffer
P::transaction(|j| {
if let Some(slf) = slf.upgrade(j) {
let mut this = slf.data.lock(j);
let mut words = slf.words.lock(j);
this.local.foreach(|k, v| {
words.update_with(k, j, |v0| v0 + v);
});
this.local.clear(j);
}
})
.unwrap();
}
}
pub fn stop_when_finished(&self) {
P::transaction(|j| {
let mut this = self.data.lock(j);
this.active = false;
})
.unwrap();
}
}