arthas 0.3.0

Arthas is an in-memory structure database.
Documentation

use std::cmp::Ordering;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::collections::HashMap;
use super::task::Task;
use error::Error;
use super::Tree;
use super::super::rc::RcChild;
use item::FieldInt;
use serde_json::Value;
use query::QueryType;
use super::super::node::{Group, Groups};
use quickersort::sort_by;
use super::super::math::Math;
use super::task::{Sub, Orders};
use scoped_pool::Pool;
use super::entrance_type::EntranceType;


pub struct Exectuor {}

impl Exectuor {
    pub fn exec(pool: &Pool,
                tree: &Tree,
                mut task: Task)
                -> Result<(usize, Vec<*const Value>), Error> {
        if tree.id_map.is_empty() {
            thread_trace!("tree is empty");
            return Ok(Default::default());
        }

        let field_groups = Mutex::new(HashMap::new());
        let field_sub = Mutex::new(HashMap::new());
        let stopped = AtomicBool::new(false);

        pool.scoped(|scope| {
            let field_groups = &field_groups;
            let field_sub = &field_sub;
            let stopped = &stopped;

            for mut sub in task.subs.drain(..) {
                let entrance_type = EntranceType::new(tree,
                                                      &sub.field_int,
                                                      &sub.comparisions.first().unwrap().other);

                match entrance_type {
                    EntranceType::Root => {
                        scope.execute(move || {
                            let mut groups = Groups::new();
                            let rc_node = tree.root
                                .get(&sub.field_int)
                                .unwrap();
                            thread_trace!("search root, root is {:?}",
                                          rc_node.read().unwrap().get_value());

                            rc_node.read()
                                .unwrap()
                                .search_root(stopped, &mut groups, &mut sub);

                            field_groups.lock().unwrap().insert(sub.field_int.clone(), groups);
                            field_sub.lock().unwrap().insert(sub.field_int.clone(), sub);
                        });
                    }
                    EntranceType::Min => {
                        scope.execute(move || {
                            let mut groups = Groups::new();
                            let rc_node = tree.min
                                .get(&sub.field_int)
                                .unwrap();

                            thread_trace!("search min, min is {:?}",
                                          rc_node.read().unwrap().get_value());

                            rc_node.read()
                                .unwrap()
                                .search_min(stopped, &mut groups, &mut sub);

                            field_groups.lock().unwrap().insert(sub.field_int.clone(), groups);
                            field_sub.lock().unwrap().insert(sub.field_int.clone(), sub);
                        })
                    }
                    EntranceType::Max => {
                        scope.execute(move || {
                            let mut groups = Groups::new();
                            let rc_node = tree.max
                                .get(&sub.field_int)
                                .unwrap();

                            thread_trace!("search max, max is {:?}",
                                          rc_node.read().unwrap().get_value());

                            rc_node.read()
                                .unwrap()
                                .search_max(stopped, &mut groups, &mut sub);

                            field_groups.lock().unwrap().insert(sub.field_int.clone(), groups);
                            field_sub.lock().unwrap().insert(sub.field_int.clone(), sub);
                        })
                    }
                    EntranceType::None => {
                        thread_trace!("no root found!");
                    }
                }
            }
        });

        let mut field_groups = field_groups.into_inner().unwrap();
        let mut field_sub = field_sub.into_inner().unwrap();
        let need_sort = task.has_order();
        let stopped_field = get_stopped_field(&field_sub, &task);

        thread_trace!("stopped field: {:?}", stopped_field);

        if field_groups.contains_key(&stopped_field) {
            field_sub.remove(&stopped_field);
            let groups = field_groups.remove(&stopped_field).unwrap();

            thread_trace!("wait for filter, groups len: {:?}", groups.len());

            if need_sort {
                let mut children = groups_to_children(groups);
                thread_trace!("children: {:?}", children);
                sort_children(&mut children[..], &task.orders);
                thread_trace!("sorted children: {:?}", children);
                Ok(filter_children(children, &task, &field_sub))
            } else {
                Ok(filter_groups(groups, &task, &field_sub))
            }
        } else {
            unreachable!()
        }
    }
}

#[inline]
fn filter_groups(groups: Groups,
                 task: &Task,
                 field_sub: &HashMap<FieldInt, Sub>)
                 -> (usize, Vec<*const Value>) {
    let mut found = 0;
    let mut count = 0;
    let mut values = Vec::new();
    let other_conditions_exists = !field_sub.is_empty();
    let is_count = task.query_type == QueryType::Count;

    'outer: for group in groups {
        thread_trace!("filter groups, current group: {:?}",
                      &*group.read().unwrap());

        if other_conditions_exists {
            thread_trace!("other conditions exists");
            for rc_child in group.read().unwrap().values() {
                if filter_child(rc_child,
                                &mut found,
                                &mut count,
                                &mut values,
                                task,
                                field_sub) {
                    break 'outer;
                }
            }
        } else if is_count {
            thread_trace!("only count");
            count += group.read().unwrap().len();
            found = count;
        } else {
            thread_trace!("no other conditions exists");
            if collect_child(&mut found, &mut count, task, &group, &mut values) {
                break;
            }
        }
    }

    if is_count {
        if count > task.offset {
            count -= task.offset;
        } else {
            count = 0;
        }
    }

    (count, values)
}

#[inline]
fn collect_child(found: &mut usize,
                 count: &mut usize,
                 task: &Task,
                 group: &Group,
                 values: &mut Vec<*const Value>)
                 -> bool {
    for rc_child in group.read().unwrap().values() {
        *found += 1;

        if *found > task.offset {
            *count += 1;
            values.push(rc_child.read().unwrap().get_item_pointer());

            if task.limit.is_some() && *count >= *task.limit.as_ref().unwrap() {
                return true;
            }
        }
    }

    false
}

#[inline]
fn filter_children(children: Vec<RcChild>,
                   task: &Task,
                   field_sub: &HashMap<FieldInt, Sub>)
                   -> (usize, Vec<*const Value>) {
    let mut found = 0;
    let mut count = 0;
    let mut values = Vec::new();

    for rc_child in children {
        if filter_child(&rc_child,
                        &mut found,
                        &mut count,
                        &mut values,
                        task,
                        field_sub) {
            break;
        }
    }

    (count, values)
}

#[inline]
fn filter_child(rc_child: &RcChild,
                found: &mut usize,
                count: &mut usize,
                values: &mut Vec<*const Value>,
                task: &Task,
                field_sub: &HashMap<FieldInt, Sub>)
                -> bool {
    let mut pass = true;

    for sub in field_sub.values() {
        if !sub._match(&rc_child) {
            thread_trace!("not match, left: {:?}, right: {:?}",
                          rc_child,
                          sub.comparisions);

            pass = false;
            break;
        }
    }

    if pass {
        thread_trace!("filter child pass, take: {:?}",
                      rc_child.read().unwrap().get_value());

        *found += 1;

        if *found > task.offset {
            *count += 1;
            values.push(rc_child.read().unwrap().get_item_pointer());
        }

        if task.limit.is_some() && *count >= *task.limit.as_ref().unwrap() {
            return true;
        }
    }

    false
}

#[inline]
fn sort_children(children: &mut [RcChild], orders: &Orders) {
    sort_by(children,
            &|a, b| {
        let a_rc_item = a.read().unwrap();
        let a_item = a_rc_item.item.read().unwrap();

        let b_rc_item = b.read().unwrap();
        let b_item = b_rc_item.item.read().unwrap();

        let mut ordering = Ordering::Equal;

        for &(ref field, ref order) in orders {
            let a_rc_data = &a_item.datas[field];
            let a_data = a_rc_data.read().unwrap();
            let a_value = a_data.get_value();


            let b_rc_data = &b_item.datas[field];
            let b_data = b_rc_data.read().unwrap();
            let b_value = b_data.get_value();

            ordering = a_value.cmp(b_value, order);

            if ordering != Ordering::Equal {
                break;
            }
        }

        ordering
    });
}

#[inline]
fn groups_to_children(groups: Vec<Group>) -> Vec<RcChild> {
    let mut children = Vec::new();
    for group in groups {
        for rc_child in group.read().unwrap().values() {
            children.push(rc_child.clone());
        }
    }
    children
}

#[inline]
fn get_stopped_field(field_sub: &HashMap<FieldInt, Sub>, task: &Task) -> String {
    if task.has_order() && field_sub.contains_key(task.get_order_field()) &&
       field_sub.get(task.get_order_field()).unwrap().stopped {
        return task.get_order_field().to_owned();
    }

    for (field, sub) in field_sub {
        if sub.stopped {
            return field.to_owned();
        }
    }

    unreachable!()
}