use super::{Step, StepResult, Traverser, TraverserRequirement, TraverserValue};
use crate::json;
use crate::serde_json::Value;
use std::any::Any;
use std::collections::HashMap;
pub trait BarrierStep: Step {
fn add_to_barrier(&mut self, traverser: Traverser);
fn flush_barrier(&mut self) -> Vec<Traverser>;
fn has_data(&self) -> bool;
fn is_ready(&self) -> bool;
}
#[derive(Debug, Clone)]
pub struct ReducingBarrierStep<T: Clone + Send + Sync + std::fmt::Debug> {
id: String,
labels: Vec<String>,
seed: T,
accumulator: Option<T>,
reducer_name: String,
}
#[derive(Debug, Clone)]
pub struct FoldStep {
id: String,
labels: Vec<String>,
values: Vec<Value>,
}
impl FoldStep {
pub fn new() -> Self {
Self {
id: "fold_0".to_string(),
labels: Vec::new(),
values: Vec::new(),
}
}
}
impl Default for FoldStep {
fn default() -> Self {
Self::new()
}
}
impl Step for FoldStep {
fn id(&self) -> &str {
&self.id
}
fn name(&self) -> &str {
"FoldStep"
}
fn labels(&self) -> &[String] {
&self.labels
}
fn add_label(&mut self, label: String) {
if !self.labels.contains(&label) {
self.labels.push(label);
}
}
fn requirements(&self) -> &[TraverserRequirement] {
static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
REQS
}
fn process_traverser(&self, traverser: Traverser) -> StepResult {
StepResult::Hold(vec![traverser])
}
fn reset(&mut self) {
self.values.clear();
}
fn clone_step(&self) -> Box<dyn Step> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl BarrierStep for FoldStep {
fn add_to_barrier(&mut self, traverser: Traverser) {
for _ in 0..traverser.bulk() {
self.values.push(traverser.value().to_json());
}
}
fn flush_barrier(&mut self) -> Vec<Traverser> {
if self.values.is_empty() {
return vec![Traverser::with_value(TraverserValue::List(vec![]))];
}
let result = std::mem::take(&mut self.values);
vec![Traverser::with_value(TraverserValue::List(result))]
}
fn has_data(&self) -> bool {
!self.values.is_empty()
}
fn is_ready(&self) -> bool {
true }
}
#[derive(Debug, Clone)]
pub struct CollectingBarrierStep {
id: String,
labels: Vec<String>,
values: Vec<Value>,
container: ContainerType,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ContainerType {
List,
Set,
BulkMap,
}
impl CollectingBarrierStep {
pub fn to_list() -> Self {
Self {
id: "toList_0".to_string(),
labels: Vec::new(),
values: Vec::new(),
container: ContainerType::List,
}
}
pub fn to_set() -> Self {
Self {
id: "toSet_0".to_string(),
labels: Vec::new(),
values: Vec::new(),
container: ContainerType::Set,
}
}
pub fn to_bulk_map() -> Self {
Self {
id: "toBulkSet_0".to_string(),
labels: Vec::new(),
values: Vec::new(),
container: ContainerType::BulkMap,
}
}
}
impl Step for CollectingBarrierStep {
fn id(&self) -> &str {
&self.id
}
fn name(&self) -> &str {
match self.container {
ContainerType::List => "ToListStep",
ContainerType::Set => "ToSetStep",
ContainerType::BulkMap => "ToBulkSetStep",
}
}
fn labels(&self) -> &[String] {
&self.labels
}
fn add_label(&mut self, label: String) {
if !self.labels.contains(&label) {
self.labels.push(label);
}
}
fn requirements(&self) -> &[TraverserRequirement] {
static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
REQS
}
fn process_traverser(&self, traverser: Traverser) -> StepResult {
StepResult::Hold(vec![traverser])
}
fn reset(&mut self) {
self.values.clear();
}
fn clone_step(&self) -> Box<dyn Step> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl BarrierStep for CollectingBarrierStep {
fn add_to_barrier(&mut self, traverser: Traverser) {
let value = traverser.value().to_json();
match self.container {
ContainerType::List => {
for _ in 0..traverser.bulk() {
self.values.push(value.clone());
}
}
ContainerType::Set => {
if !self.values.contains(&value) {
self.values.push(value);
}
}
ContainerType::BulkMap => {
self.values.push(value);
}
}
}
fn flush_barrier(&mut self) -> Vec<Traverser> {
let result = std::mem::take(&mut self.values);
vec![Traverser::with_value(TraverserValue::List(result))]
}
fn has_data(&self) -> bool {
!self.values.is_empty()
}
fn is_ready(&self) -> bool {
true
}
}
#[derive(Debug, Clone)]
pub struct GroupStep {
id: String,
labels: Vec<String>,
groups: HashMap<String, Vec<Value>>,
state: char,
}
impl GroupStep {
pub fn new() -> Self {
Self {
id: "group_0".to_string(),
labels: Vec::new(),
groups: HashMap::new(),
state: 'k',
}
}
}
impl Default for GroupStep {
fn default() -> Self {
Self::new()
}
}
impl Step for GroupStep {
fn id(&self) -> &str {
&self.id
}
fn name(&self) -> &str {
"GroupStep"
}
fn labels(&self) -> &[String] {
&self.labels
}
fn add_label(&mut self, label: String) {
if !self.labels.contains(&label) {
self.labels.push(label);
}
}
fn requirements(&self) -> &[TraverserRequirement] {
static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
REQS
}
fn process_traverser(&self, traverser: Traverser) -> StepResult {
StepResult::Hold(vec![traverser])
}
fn reset(&mut self) {
self.groups.clear();
self.state = 'k';
}
fn clone_step(&self) -> Box<dyn Step> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl BarrierStep for GroupStep {
fn add_to_barrier(&mut self, traverser: Traverser) {
let key = match traverser.value() {
TraverserValue::String(s) => s.clone(),
TraverserValue::Vertex(id) => id.clone(),
other => format!("{:?}", other),
};
self.groups
.entry(key)
.or_default()
.push(traverser.value().to_json());
}
fn flush_barrier(&mut self) -> Vec<Traverser> {
let result: HashMap<String, Value> = self
.groups
.drain()
.map(|(k, v)| (k, Value::Array(v)))
.collect();
vec![Traverser::with_value(TraverserValue::Map(result))]
}
fn has_data(&self) -> bool {
!self.groups.is_empty()
}
fn is_ready(&self) -> bool {
true
}
}
#[derive(Debug, Clone)]
pub struct GroupCountStep {
id: String,
labels: Vec<String>,
counts: HashMap<String, u64>,
}
impl GroupCountStep {
pub fn new() -> Self {
Self {
id: "groupCount_0".to_string(),
labels: Vec::new(),
counts: HashMap::new(),
}
}
}
impl Default for GroupCountStep {
fn default() -> Self {
Self::new()
}
}
impl Step for GroupCountStep {
fn id(&self) -> &str {
&self.id
}
fn name(&self) -> &str {
"GroupCountStep"
}
fn labels(&self) -> &[String] {
&self.labels
}
fn add_label(&mut self, label: String) {
if !self.labels.contains(&label) {
self.labels.push(label);
}
}
fn requirements(&self) -> &[TraverserRequirement] {
static REQS: &[TraverserRequirement] =
&[TraverserRequirement::Barrier, TraverserRequirement::Bulk];
REQS
}
fn process_traverser(&self, traverser: Traverser) -> StepResult {
StepResult::Hold(vec![traverser])
}
fn reset(&mut self) {
self.counts.clear();
}
fn clone_step(&self) -> Box<dyn Step> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl BarrierStep for GroupCountStep {
fn add_to_barrier(&mut self, traverser: Traverser) {
let key = match traverser.value() {
TraverserValue::String(s) => s.clone(),
TraverserValue::Vertex(id) => id.clone(),
other => format!("{:?}", other),
};
*self.counts.entry(key).or_insert(0) += traverser.bulk();
}
fn flush_barrier(&mut self) -> Vec<Traverser> {
let result: HashMap<String, Value> =
self.counts.drain().map(|(k, v)| (k, json!(v))).collect();
vec![Traverser::with_value(TraverserValue::Map(result))]
}
fn has_data(&self) -> bool {
!self.counts.is_empty()
}
fn is_ready(&self) -> bool {
true
}
}
#[derive(Debug, Clone)]
pub struct OrderStep {
id: String,
labels: Vec<String>,
traversers: Vec<Traverser>,
ascending: bool,
}
impl OrderStep {
pub fn new() -> Self {
Self {
id: "order_0".to_string(),
labels: Vec::new(),
traversers: Vec::new(),
ascending: true,
}
}
pub fn descending() -> Self {
Self {
id: "order_desc_0".to_string(),
labels: Vec::new(),
traversers: Vec::new(),
ascending: false,
}
}
}
impl Default for OrderStep {
fn default() -> Self {
Self::new()
}
}
impl Step for OrderStep {
fn id(&self) -> &str {
&self.id
}
fn name(&self) -> &str {
"OrderStep"
}
fn labels(&self) -> &[String] {
&self.labels
}
fn add_label(&mut self, label: String) {
if !self.labels.contains(&label) {
self.labels.push(label);
}
}
fn requirements(&self) -> &[TraverserRequirement] {
static REQS: &[TraverserRequirement] = &[TraverserRequirement::Barrier];
REQS
}
fn process_traverser(&self, traverser: Traverser) -> StepResult {
StepResult::Hold(vec![traverser])
}
fn reset(&mut self) {
self.traversers.clear();
}
fn clone_step(&self) -> Box<dyn Step> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl BarrierStep for OrderStep {
fn add_to_barrier(&mut self, traverser: Traverser) {
self.traversers.push(traverser);
}
fn flush_barrier(&mut self) -> Vec<Traverser> {
let ascending = self.ascending;
self.traversers.sort_by(|a, b| {
let a_str = format!("{:?}", a.value());
let b_str = format!("{:?}", b.value());
if ascending {
a_str.cmp(&b_str)
} else {
b_str.cmp(&a_str)
}
});
std::mem::take(&mut self.traversers)
}
fn has_data(&self) -> bool {
!self.traversers.is_empty()
}
fn is_ready(&self) -> bool {
true
}
}
#[derive(Debug, Clone)]
pub struct SumStep {
id: String,
labels: Vec<String>,
sum: f64,
}
impl SumStep {
pub fn new() -> Self {
Self {
id: "sum_0".to_string(),
labels: Vec::new(),
sum: 0.0,
}
}
}
impl Default for SumStep {
fn default() -> Self {
Self::new()
}
}
impl Step for SumStep {
fn id(&self) -> &str {
&self.id
}
fn name(&self) -> &str {
"SumStep"
}
fn labels(&self) -> &[String] {
&self.labels
}
fn add_label(&mut self, label: String) {
if !self.labels.contains(&label) {
self.labels.push(label);
}
}
fn requirements(&self) -> &[TraverserRequirement] {
static REQS: &[TraverserRequirement] =
&[TraverserRequirement::Barrier, TraverserRequirement::Bulk];
REQS
}
fn process_traverser(&self, traverser: Traverser) -> StepResult {
StepResult::Hold(vec![traverser])
}
fn reset(&mut self) {
self.sum = 0.0;
}
fn clone_step(&self) -> Box<dyn Step> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
}
impl BarrierStep for SumStep {
fn add_to_barrier(&mut self, traverser: Traverser) {
let value = match traverser.value() {
TraverserValue::Integer(i) => *i as f64,
TraverserValue::Float(f) => *f,
_ => 0.0,
};
self.sum += value * traverser.bulk() as f64;
}
fn flush_barrier(&mut self) -> Vec<Traverser> {
let result = self.sum;
self.sum = 0.0;
vec![Traverser::with_value(TraverserValue::Float(result))]
}
fn has_data(&self) -> bool {
self.sum != 0.0
}
fn is_ready(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fold_step() {
let mut step = FoldStep::new();
step.add_to_barrier(Traverser::new("v1"));
step.add_to_barrier(Traverser::new("v2"));
step.add_to_barrier(Traverser::new("v3"));
let result = step.flush_barrier();
assert_eq!(result.len(), 1);
if let TraverserValue::List(list) = result[0].value() {
assert_eq!(list.len(), 3);
} else {
panic!("Expected list");
}
}
#[test]
fn test_fold_with_bulk() {
let mut step = FoldStep::new();
let mut t = Traverser::new("v1");
t.set_bulk(3);
step.add_to_barrier(t);
let result = step.flush_barrier();
if let TraverserValue::List(list) = result[0].value() {
assert_eq!(list.len(), 3); }
}
#[test]
fn test_group_step() {
let mut step = GroupStep::new();
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"a".to_string(),
)));
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"b".to_string(),
)));
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"a".to_string(),
)));
let result = step.flush_barrier();
assert_eq!(result.len(), 1);
if let TraverserValue::Map(map) = result[0].value() {
assert_eq!(map.len(), 2); }
}
#[test]
fn test_group_count_step() {
let mut step = GroupCountStep::new();
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"a".to_string(),
)));
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"b".to_string(),
)));
let mut t = Traverser::with_value(TraverserValue::String("a".to_string()));
t.set_bulk(2);
step.add_to_barrier(t);
let result = step.flush_barrier();
if let TraverserValue::Map(map) = result[0].value() {
assert_eq!(map.get("a"), Some(&json!(3)));
assert_eq!(map.get("b"), Some(&json!(1)));
}
}
#[test]
fn test_order_step() {
let mut step = OrderStep::new();
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"c".to_string(),
)));
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"a".to_string(),
)));
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"b".to_string(),
)));
let result = step.flush_barrier();
assert_eq!(result.len(), 3);
}
#[test]
fn test_sum_step() {
let mut step = SumStep::new();
step.add_to_barrier(Traverser::with_value(TraverserValue::Integer(10)));
step.add_to_barrier(Traverser::with_value(TraverserValue::Integer(20)));
step.add_to_barrier(Traverser::with_value(TraverserValue::Integer(30)));
let result = step.flush_barrier();
if let TraverserValue::Float(sum) = result[0].value() {
assert_eq!(*sum, 60.0);
}
}
#[test]
fn test_collecting_to_set() {
let mut step = CollectingBarrierStep::to_set();
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"a".to_string(),
)));
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"a".to_string(),
)));
step.add_to_barrier(Traverser::with_value(TraverserValue::String(
"b".to_string(),
)));
let result = step.flush_barrier();
if let TraverserValue::List(list) = result[0].value() {
assert_eq!(list.len(), 2); }
}
}