1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use rayon::prelude::*;
use crate::item::{ItemPool, MatchedItem};
use crate::spinlock::SpinLock;
use crate::{CaseMatching, MatchEngineFactory};
use defer_drop::DeferDrop;
use std::rc::Rc;
pub struct MatcherControl {
stopped: Arc<AtomicBool>,
processed: Arc<AtomicUsize>,
matched: Arc<AtomicUsize>,
items: Arc<SpinLock<Vec<MatchedItem>>>,
thread_matcher: JoinHandle<()>,
}
impl MatcherControl {
pub fn get_num_processed(&self) -> usize {
self.processed.load(Ordering::Relaxed)
}
pub fn get_num_matched(&self) -> usize {
self.matched.load(Ordering::Relaxed)
}
pub fn kill(self) {
self.stopped.store(true, Ordering::Relaxed);
let _ = self.thread_matcher.join();
}
pub fn stopped(&self) -> bool {
self.stopped.load(Ordering::Relaxed)
}
pub fn into_items(self) -> Arc<SpinLock<Vec<MatchedItem>>> {
while !self.stopped.load(Ordering::Relaxed) {}
self.items
}
}
pub struct Matcher {
engine_factory: Rc<dyn MatchEngineFactory>,
case_matching: CaseMatching,
}
impl Matcher {
pub fn builder(engine_factory: Rc<dyn MatchEngineFactory>) -> Self {
Self {
engine_factory,
case_matching: CaseMatching::default(),
}
}
pub fn case(mut self, case_matching: CaseMatching) -> Self {
self.case_matching = case_matching;
self
}
pub fn build(self) -> Self {
self
}
pub fn run<C>(&self, query: &str, item_pool: Arc<DeferDrop<ItemPool>>, callback: C) -> MatcherControl
where
C: Fn(Arc<SpinLock<Vec<MatchedItem>>>) + Send + 'static,
{
let matcher_engine = self.engine_factory.create_engine_with_case(query, self.case_matching);
debug!("engine: {}", matcher_engine);
let stopped = Arc::new(AtomicBool::new(false));
let stopped_clone = stopped.clone();
let processed = Arc::new(AtomicUsize::new(0));
let processed_clone = processed.clone();
let matched = Arc::new(AtomicUsize::new(0));
let matched_clone = matched.clone();
let matched_items = Arc::new(SpinLock::new(Vec::new()));
let matched_items_clone = matched_items.clone();
let thread_matcher = thread::spawn(move || {
let num_taken = item_pool.num_taken();
let items = item_pool.take();
trace!("matcher start, total: {}", items.len());
let result: Result<Vec<_>, _> = items
.into_par_iter()
.enumerate()
.filter_map(|(index, item)| {
processed.fetch_add(1, Ordering::Relaxed);
if stopped.load(Ordering::Relaxed) {
Some(Err("matcher killed"))
} else if let Some(match_result) = matcher_engine.match_item(item.clone()) {
matched.fetch_add(1, Ordering::Relaxed);
Some(Ok(MatchedItem {
item: item.clone(),
rank: match_result.rank,
matched_range: Some(match_result.matched_range),
item_idx: (num_taken + index) as u32,
}))
} else {
None
}
})
.collect();
if let Ok(items) = result {
let mut pool = matched_items.lock();
*pool = items;
trace!("matcher stop, total matched: {}", pool.len());
}
callback(matched_items.clone());
stopped.store(true, Ordering::Relaxed);
});
MatcherControl {
stopped: stopped_clone,
matched: matched_clone,
processed: processed_clone,
items: matched_items_clone,
thread_matcher,
}
}
}