1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
use std::cmp::max;
use std::sync::{Arc, Mutex};
use crate::errors::Error;
use threadpool::ThreadPool;
const DEFAULT_NUM_THREADS: usize = 8;
const DEFAULT_QUERY_LIMIT: u32 = u16::max_value() as u32;
pub trait VertexMapper: Send + Sync + 'static {
fn num_threads(&self) -> usize {
DEFAULT_NUM_THREADS
}
fn query_limit(&self) -> u32 {
DEFAULT_QUERY_LIMIT
}
fn t_filter(&self) -> Option<indradb::Identifier> {
None
}
fn map(&self, vertex: indradb::Vertex) -> Result<(), Error>;
}
pub fn map<M: VertexMapper>(
mapper: Arc<M>,
datastore: Arc<dyn indradb::Datastore + Send + Sync + 'static>,
) -> Result<(), Error> {
let pool = ThreadPool::new(max(mapper.num_threads(), 1));
let query_limit = max(mapper.query_limit(), 1);
let t_filter = mapper.t_filter();
let last_err: Arc<Mutex<Option<Error>>> = Arc::new(Mutex::new(None));
let mut last_id: Option<uuid::Uuid> = None;
loop {
if last_err.lock().unwrap().is_some() {
break;
}
let q = indradb::RangeVertexQuery {
limit: query_limit,
t: t_filter.clone(),
start_id: last_id,
};
let vertices = match datastore.get_vertices(q.into()) {
Ok(value) => value,
Err(err) => {
*last_err.lock().unwrap() = Some(err.into());
break;
}
};
let is_last_query = vertices.len() < query_limit as usize;
if let Some(last_vertex) = vertices.last() {
last_id = Some(last_vertex.id);
}
for vertex in vertices {
let mapper = mapper.clone();
let last_err = last_err.clone();
pool.execute(move || {
if let Err(err) = mapper.map(vertex) {
*last_err.lock().unwrap() = Some(err);
}
});
}
if is_last_query {
break;
}
}
pool.join();
let mut last_err = last_err.lock().unwrap();
if last_err.is_some() {
Err(last_err.take().unwrap())
} else {
Ok(())
}
}