plexus-engine 0.3.4

Engine integration traits for consuming Plexus plans
Documentation
use super::*;
use plexus_conformance::{
    ConformanceCase, ConformanceGraphMode, ConformanceSeedNode, ConformanceSeedRelationship,
    QueryResult as ConfQueryResult, Value as ConfValue,
};
use plexus_gql::parse_gql_with_metadata;
use plexus_ir::{lower_to_plan, lower_to_plan_with_match_graph_refs};
use plexus_serde::{deserialize_plan, serialize_plan, Plan};
use std::collections::HashSet;

fn node_from_seed(seed: &ConformanceSeedNode) -> Node {
    Node {
        id: seed.id,
        labels: seed.labels.iter().cloned().collect::<HashSet<_>>(),
        props: seed
            .props
            .iter()
            .map(|(k, v)| (k.clone(), conf_value_to_local(v)))
            .collect(),
    }
}

fn rel_from_seed(seed: &ConformanceSeedRelationship) -> Relationship {
    Relationship {
        id: seed.id,
        src: seed.src,
        dst: seed.dst,
        typ: seed.typ.clone(),
        props: seed
            .props
            .iter()
            .map(|(k, v)| (k.clone(), conf_value_to_local(v)))
            .collect(),
    }
}

fn conf_value_to_local(v: &ConfValue) -> Value {
    match v {
        ConfValue::Null => Value::Null,
        ConfValue::Bool(x) => Value::Bool(*x),
        ConfValue::Int(x) => Value::Int(*x),
        ConfValue::Float(x) => Value::Float(*x),
        ConfValue::String(x) => Value::String(x.clone()),
        ConfValue::NodeRef(x) => Value::NodeRef(*x),
        ConfValue::RelRef(x) => Value::RelRef(*x),
        ConfValue::List(xs) => Value::List(xs.iter().map(conf_value_to_local).collect()),
        ConfValue::Map(m) => Value::Map(
            m.iter()
                .map(|(k, v)| (k.clone(), conf_value_to_local(v)))
                .collect(),
        ),
    }
}

fn local_value_to_conf(v: &Value) -> ConfValue {
    match v {
        Value::Null => ConfValue::Null,
        Value::Bool(x) => ConfValue::Bool(*x),
        Value::Int(x) => ConfValue::Int(*x),
        Value::Float(x) => ConfValue::Float(*x),
        Value::String(x) => ConfValue::String(x.clone()),
        Value::NodeRef(x) => ConfValue::NodeRef(*x),
        Value::RelRef(x) => ConfValue::RelRef(*x),
        Value::List(xs) => ConfValue::List(xs.iter().map(local_value_to_conf).collect()),
        Value::Map(m) => ConfValue::Map(
            m.iter()
                .map(|(k, v)| (k.clone(), local_value_to_conf(v)))
                .collect(),
        ),
    }
}

fn local_result_to_conf(out: QueryResult) -> ConfQueryResult {
    ConfQueryResult {
        rows: out
            .rows
            .into_iter()
            .map(|row| row.iter().map(local_value_to_conf).collect())
            .collect(),
    }
}

fn base_graph(case: &ConformanceCase) -> Graph {
    match case.setup.graph_mode {
        Some(ConformanceGraphMode::Empty) => Graph::default(),
        Some(ConformanceGraphMode::Fixture) | None => fixture_graph(),
    }
}

fn graph_for_case(case: &ConformanceCase) -> Graph {
    let mut graph = base_graph(case);
    graph
        .nodes
        .extend(case.setup.seed_nodes.iter().map(node_from_seed));
    graph
        .rels
        .extend(case.setup.seed_relationships.iter().map(rel_from_seed));
    graph
}

fn execute_setup_queries(
    engine: &mut InMemoryEngine,
    setup_queries: &[String],
) -> Result<(), String> {
    for setup in setup_queries {
        let setup_query = parse(setup).map_err(|e| format!("{e}"))?;
        let setup_bytes = lower_to_flatbuffer(&setup_query).map_err(|e| format!("{e}"))?;
        let _ = execute_serialized(engine, &setup_bytes).map_err(|e| format!("{e}"))?;
    }
    Ok(())
}

fn execute_case(case: &ConformanceCase, pipeline: Option<&str>) -> Result<QueryResult, String> {
    let mut engine = InMemoryEngine::new(graph_for_case(case));
    execute_setup_queries(&mut engine, &case.setup.setup_queries)?;

    let graph_before_query = engine.graph.clone();
    let result = match lower_case_plan_and_bytes(case, pipeline) {
        Ok((plan, bytes)) => {
            validate_case_capabilities(case, &plan)?;
            execute_serialized(&mut engine, &bytes).map_err(|e| format!("{e}"))
        }
        Err(e) => Err(e),
    };

    if case.setup.assert_no_side_effects && engine.graph != graph_before_query {
        return Err(format!("readonly case mutated graph state: {}", case.name));
    }

    result
}

fn has_tag(case: &ConformanceCase, tag: &str) -> bool {
    case.tags.iter().any(|t| t.eq_ignore_ascii_case(tag))
}

fn is_gql_case(case: &ConformanceCase) -> bool {
    has_tag(case, "gql")
}

fn lower_case_plan_and_bytes(
    case: &ConformanceCase,
    pipeline: Option<&str>,
) -> Result<(Plan, Vec<u8>), String> {
    if is_gql_case(case) {
        let (query, metadata) = parse_gql_with_metadata(&case.query).map_err(|e| format!("{e}"))?;
        let plan = lower_to_plan_with_match_graph_refs(&query, &metadata.match_graph_refs)
            .map_err(|e| format!("{e}"))?;
        let bytes = serialize_plan(&plan).map_err(|e| format!("{e}"))?;
        return Ok((plan, bytes));
    }

    let query = parse(&case.query).map_err(|e| format!("{e}"))?;
    match pipeline {
        Some(pipeline_name) => {
            let ctx = make_context();
            let mut module = lower(&query, &ctx).map_err(|e| format!("{e}"))?;
            optimize(&mut module, &ctx, pipeline_name).map_err(|e| format!("{e}"))?;
            let bytes = module_to_flatbuffer(&module).map_err(|e| format!("{e}"))?;
            let plan = deserialize_plan(&bytes).map_err(|e| format!("{e}"))?;
            Ok((plan, bytes))
        }
        None => {
            let plan = lower_to_plan(&query).map_err(|e| format!("{e}"))?;
            let bytes = lower_to_flatbuffer(&query).map_err(|e| format!("{e}"))?;
            Ok((plan, bytes))
        }
    }
}

fn validate_case_capabilities(case: &ConformanceCase, plan: &Plan) -> Result<(), String> {
    let cap_profile_requested = has_tag(case, "cap-graph-ref-disabled")
        || has_tag(case, "cap-graph-ref-enabled")
        || has_tag(case, "cap-multi-graph-disabled")
        || has_tag(case, "cap-multi-graph-enabled");
    if !cap_profile_requested {
        return Ok(());
    }

    let plan_semver: PlanSemver = (&plan.version).into();
    let mut caps = EngineCapabilities::full(VersionRange::new(plan_semver, plan_semver));
    caps.supports_graph_ref = has_tag(case, "cap-graph-ref-enabled");
    caps.supports_multi_graph = has_tag(case, "cap-multi-graph-enabled");

    if has_tag(case, "cap-graph-ref-disabled") {
        caps.supports_graph_ref = false;
    }
    if has_tag(case, "cap-multi-graph-disabled") {
        caps.supports_multi_graph = false;
    }

    validate_plan_against_capabilities(plan, &caps).map_err(|e| format!("{e}"))
}

pub(super) fn execute_case_unoptimized(case: &ConformanceCase) -> Result<QueryResult, String> {
    execute_case(case, None)
}

pub(super) fn execute_case_optimized(case: &ConformanceCase) -> Result<QueryResult, String> {
    execute_case(case, Some(PIPELINE_CORE))
}

pub(super) fn execute_case_with_pipeline(
    case: &ConformanceCase,
    pipeline: &str,
) -> Result<QueryResult, String> {
    execute_case(case, Some(pipeline))
}

pub(super) fn execute_case_unoptimized_for_conformance(
    case: &ConformanceCase,
) -> Result<ConfQueryResult, String> {
    execute_case_unoptimized(case).map(local_result_to_conf)
}

pub(super) fn execute_case_optimized_for_conformance(
    case: &ConformanceCase,
) -> Result<ConfQueryResult, String> {
    execute_case_optimized(case).map(local_result_to_conf)
}