indradb_plugin_host/
util.rs1use std::cmp::max;
2use std::sync::{Arc, Mutex};
3
4use crate::errors::Error;
5
6use threadpool::ThreadPool;
7
8const DEFAULT_NUM_THREADS: usize = 8;
9
10pub trait VertexMapper: Send + Sync + 'static {
12 fn num_threads(&self) -> usize {
14 DEFAULT_NUM_THREADS
15 }
16 fn map(&self, vertex: indradb::Vertex) -> Result<(), Error>;
18}
19
20pub fn map<'a, M: VertexMapper>(txn: &(dyn indradb::Transaction<'a> + 'a), mapper: Arc<M>) -> Result<(), Error> {
26 let first_err: Arc<Mutex<Option<Error>>> = Arc::new(Mutex::new(None));
27 let pool = ThreadPool::new(max(mapper.num_threads(), 1));
28 let txn_ptr = txn as *const dyn indradb::Transaction<'a>;
29 let mut i = 0;
30
31 for vertex in unsafe { (*txn_ptr).all_vertices()? } {
32 i += 1;
33 if i % 1000 == 0 && first_err.lock().unwrap().is_some() {
34 break;
37 }
38 match vertex {
39 Ok(vertex) => {
40 let vertex: indradb::Vertex = vertex;
41 let mapper = mapper.clone();
42 let first_err = first_err.clone();
43 pool.execute(move || {
44 if let Err(err) = mapper.map(vertex) {
45 let mut first_err = first_err.lock().unwrap();
46 if first_err.is_none() {
47 *first_err = Some(err);
48 }
49 }
50 });
51 }
52 Err(err) => {
53 let mut first_err = first_err.lock().unwrap();
54 if first_err.is_none() {
55 *first_err = Some(Error::IndraDB(err));
56 }
57 }
58 }
59 }
60
61 pool.join();
62
63 let mut first_err = first_err.lock().unwrap();
64 if first_err.is_some() {
65 Err(first_err.take().unwrap())
66 } else {
67 Ok(())
68 }
69}