mod counter;
mod master;
mod response_chan;
mod worker;
pub use self::response_chan::{bounded, ResponseReceiver, ResponseSender, Update};
pub use self::master::Master;
use std::thread::{sleep, spawn};
use statics;
use uuid::Uuid;
use self::counter::Counter;
use indradb::VertexQuery;
use serde_json::Value as JsonValue;
use std::time::Duration;
use indradb::{Datastore, Transaction};
lazy_static! {
static ref REPORT_TIME: Duration = Duration::from_secs(10);
}
pub fn execute_mapreduce(contents: String, path: String, arg: JsonValue, sender: ResponseSender) {
let sent = Counter::new();
let processing = Counter::new();
let finished = Counter::new();
{
let sender = sender.clone();
let sent = sent.clone();
let processing = processing.clone();
let finished = finished.clone();
spawn(move || loop {
sleep(*REPORT_TIME);
let message = json!({
"sent": sent.get(),
"processing": processing.get(),
"finished": finished.get()
});
if sender.0.send(Update::Ping(message)).is_err() {
return;
}
});
}
spawn(move || {
let trans = match statics::DATASTORE.transaction() {
Ok(trans) => trans,
Err(err) => {
let message = format!("Query setup failed: {:?}", err);
sender.0.send(Update::Err(json!(message))).ok();
return;
}
};
let mapreducer = Master::start(&contents, &path, &arg, sent, &processing, &finished);
let mut last_id: Option<Uuid> = None;
loop {
let q = VertexQuery::All {
start_id: last_id,
limit: *statics::MAP_REDUCE_QUERY_LIMIT,
};
let vertices = match trans.get_vertices(&q) {
Ok(vertices) => vertices,
Err(err) => {
let message = format!("Query failed: {:?}", err);
sender.0.send(Update::Err(json!(message))).ok();
break;
}
};
let mut done = vertices.len() < *statics::MAP_REDUCE_QUERY_LIMIT as usize;
if let Some(last_vertex) = vertices.last() {
last_id = Some(last_vertex.id);
}
for vertex in vertices {
if !mapreducer.add_vertex(vertex) {
done = true;
break;
}
}
if done {
break;
}
}
match mapreducer.join() {
Ok(value) => {
sender.0.send(Update::Ok(value)).ok();
}
Err(err) => {
let message = format!("Mapreduce failed: {:?}", err);
sender.0.send(Update::Err(json!(message))).ok();
}
}
});
}
#[cfg(test)]
mod tests {
use std::io::prelude::*;
use std::fs::File;
use super::execute_mapreduce;
use std::path::Path;
use indradb::Type;
use statics;
use script;
use indradb::{Datastore, Transaction};
use super::response_chan::Update;
use serde_json::Value as JsonValue;
fn add_seed() {
let trans = statics::DATASTORE.transaction().unwrap();
trans
.create_vertex(&Type::new("foo".to_string()).unwrap())
.unwrap();
}
fn run(file_path_str: &str, input: JsonValue) -> JsonValue {
let file_path = Path::new(file_path_str);
let mut file = File::open(file_path).expect("Could not open script file");
let mut contents = String::new();
file.read_to_string(&mut contents)
.expect("Could not get script file contents");
let (sender, receiver) = script::bounded(1);
execute_mapreduce(contents, file_path_str.to_string(), input, sender);
let update = receiver.0.recv().unwrap();
drop(receiver);
match update {
Update::Err(err) => panic!("{:?}", err),
Update::Ping(_) => panic!("Expected not to get a ping message"),
Update::Ok(message) => message,
}
}
#[test]
fn should_mapreduce() {
add_seed();
let value = run("test_scripts/mapreduce/count.lua", json!(2));
assert!(value.as_f64().unwrap() >= 3.0);
}
#[test]
fn should_commit() {
add_seed();
let value = run("test_scripts/mapreduce/commit_first.lua", JsonValue::Null);
assert!(value == JsonValue::Null);
let value = run("test_scripts/mapreduce/commit_second.lua", JsonValue::Null);
assert!(value.as_f64().unwrap() >= 1.0);
}
}