indradb_plugin_host/
util.rs

1use std::cmp::max;
2use std::sync::{Arc, Mutex};
3
4use crate::errors::Error;
5
6use threadpool::ThreadPool;
7
8const DEFAULT_NUM_THREADS: usize = 8;
9
10/// Trait for running an operation on all vertices in a datastore.
11pub trait VertexMapper: Send + Sync + 'static {
12    /// The number of threads that should execute the map operation.
13    fn num_threads(&self) -> usize {
14        DEFAULT_NUM_THREADS
15    }
16    /// The map operation.
17    fn map(&self, vertex: indradb::Vertex) -> Result<(), Error>;
18}
19
20/// Runs an operation on all vertices in the datastore.
21///
22/// # Arguments
23/// * `mapper`: Specified options and the map operation to run.
24/// * `database`: The database.
25pub fn map<'a, M: VertexMapper>(txn: &(dyn indradb::Transaction<'a> + 'a), mapper: Arc<M>) -> Result<(), Error> {
26    let first_err: Arc<Mutex<Option<Error>>> = Arc::new(Mutex::new(None));
27    let pool = ThreadPool::new(max(mapper.num_threads(), 1));
28    let txn_ptr = txn as *const dyn indradb::Transaction<'a>;
29    let mut i = 0;
30
31    for vertex in unsafe { (*txn_ptr).all_vertices()? } {
32        i += 1;
33        if i % 1000 == 0 && first_err.lock().unwrap().is_some() {
34            // Break on error, but also only check every once in a while since
35            // the error is behind a mutex.
36            break;
37        }
38        match vertex {
39            Ok(vertex) => {
40                let vertex: indradb::Vertex = vertex;
41                let mapper = mapper.clone();
42                let first_err = first_err.clone();
43                pool.execute(move || {
44                    if let Err(err) = mapper.map(vertex) {
45                        let mut first_err = first_err.lock().unwrap();
46                        if first_err.is_none() {
47                            *first_err = Some(err);
48                        }
49                    }
50                });
51            }
52            Err(err) => {
53                let mut first_err = first_err.lock().unwrap();
54                if first_err.is_none() {
55                    *first_err = Some(Error::IndraDB(err));
56                }
57            }
58        }
59    }
60
61    pool.join();
62
63    let mut first_err = first_err.lock().unwrap();
64    if first_err.is_some() {
65        Err(first_err.take().unwrap())
66    } else {
67        Ok(())
68    }
69}