use std::cell::RefCell;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
use std::rc::Weak;
use std::time::{Duration, Instant};
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::{Scope, Stream};
use crate::server::scheduler::Scheduler;
use crate::sources::Sourceable;
use crate::{AttributeConfig, InputSemantics};
use crate::{Aid, Eid, Value};
use Value::{Bool, Number};
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct JsonFile {
pub path: String,
pub attributes: Vec<Aid>,
}
impl Sourceable<Duration> for JsonFile {
fn source<S: Scope<Timestamp = Duration>>(
&self,
scope: &mut S,
t0: Instant,
_scheduler: Weak<RefCell<Scheduler>>,
) -> Vec<(Aid, AttributeConfig, Stream<S, ((Value, Value), Duration, isize)>)> {
let filename = self.path.clone();
let mut demux = OperatorBuilder::new(format!("JsonFile({})", filename), scope.clone());
let operator_info = demux.operator_info();
demux.set_notify(false);
let mut wrappers = Vec::with_capacity(self.attributes.len());
let mut streams = Vec::with_capacity(self.attributes.len());
for _ in self.attributes.iter() {
let (wrapper, stream) = demux.new_output();
wrappers.push(wrapper);
streams.push(stream);
}
let scope_handle = scope.clone();
let attributes = self.attributes.clone();
demux.build(move |mut capabilities| {
let scope = scope_handle;
let activator = scope.activator_for(&operator_info.address[..]);
let worker_index = scope.index();
let num_workers = scope.peers();
let path = Path::new(&filename);
let file = File::open(&path).unwrap();
let reader = BufReader::new(file);
let mut iterator = reader.lines().peekable();
let mut num_objects_read = 0;
let mut object_index = 0;
move |_frontiers| {
let mut handles = Vec::with_capacity(attributes.len());
for wrapper in wrappers.iter_mut() {
handles.push(wrapper.activate());
}
if iterator.peek().is_some() {
let mut sessions = Vec::with_capacity(attributes.len());
for (idx, handle) in handles.iter_mut().enumerate() {
sessions.push(handle.session(capabilities.get(idx).unwrap()));
}
let time = Instant::now().duration_since(t0);
for readline in iterator.by_ref().take(256 - 1) {
let line = readline.expect("read error");
if (object_index % num_workers == worker_index) && !line.is_empty() {
let obj: serde_json::Value = serde_json::from_str(&line).unwrap();
let obj_map = obj.as_object().unwrap();
for (idx, aid) in attributes.iter().enumerate() {
match obj_map.get(aid) {
None => {}
Some(json_value) => {
let v = match *json_value {
serde_json::Value::String(ref s) => Value::String(s.to_string()),
serde_json::Value::Number(ref num) => {
match num.as_i64() {
None => panic!("only i64 supported at the moment"),
Some(num) => Number(num),
}
},
serde_json::Value::Bool(ref b) => Bool(*b),
_ => panic!("only strings, booleans, and i64 types supported at the moment"),
};
let tuple = (Value::Eid(object_index as Eid), v);
sessions.get_mut(idx)
.unwrap()
.give((tuple, time, 1));
}
}
}
num_objects_read += 1;
}
object_index += 1;
}
activator.activate();
} else {
capabilities.drain(..);
}
}
});
streams
}
}