plexus-engine 0.3.6

Engine integration traits for consuming Plexus plans
Documentation
use super::*;
use plexus_ir::{
    lower, lower_to_flatbuffer, make_context, module_to_flatbuffer, optimize, PIPELINE_CORE,
    PIPELINE_FULL,
};
use plexus_parser::parse;
use plexus_serde::{current_plan_version, ColDef, ColKind, LogicalType, Version};

fn fixture_graph() -> Graph {
    let node = |id: u64, labels: &[&str], props: &[(&str, Value)]| Node {
        id,
        labels: labels.iter().map(|x| x.to_string()).collect(),
        props: props
            .iter()
            .map(|(k, v)| ((*k).to_string(), v.clone()))
            .collect(),
    };
    let rel = |id: u64, src: u64, dst: u64, typ: &str, props: &[(&str, Value)]| Relationship {
        id,
        src,
        dst,
        typ: typ.to_string(),
        props: props
            .iter()
            .map(|(k, v)| ((*k).to_string(), v.clone()))
            .collect(),
    };

    Graph {
        nodes: vec![
            node(
                1,
                &["Person"],
                &[
                    ("name", Value::String("Alice".to_string())),
                    ("age", Value::Int(30)),
                ],
            ),
            node(
                2,
                &["Person"],
                &[
                    ("name", Value::String("Bob".to_string())),
                    ("age", Value::Int(40)),
                ],
            ),
            node(
                3,
                &["Company"],
                &[("name", Value::String("Acme".to_string()))],
            ),
        ],
        rels: vec![
            rel(10, 1, 2, "KNOWS", &[]),
            rel(11, 2, 1, "KNOWS", &[]),
            rel(12, 2, 3, "WORKS_AT", &[]),
        ],
    }
}

fn fixture_vector_graph() -> Graph {
    let node = |id: u64, labels: &[&str], props: &[(&str, Value)]| Node {
        id,
        labels: labels.iter().map(|x| x.to_string()).collect(),
        props: props
            .iter()
            .map(|(k, v)| ((*k).to_string(), v.clone()))
            .collect(),
    };

    Graph {
        nodes: vec![
            node(
                1,
                &["Doc"],
                &[
                    ("title", Value::String("Alpha".to_string())),
                    (
                        "embedding",
                        Value::List(vec![Value::Float(1.0), Value::Float(0.0)]),
                    ),
                    ("relevance", Value::Float(0.8)),
                ],
            ),
            node(
                2,
                &["Doc"],
                &[
                    ("title", Value::String("Beta".to_string())),
                    (
                        "embedding",
                        Value::List(vec![Value::Float(0.0), Value::Float(1.0)]),
                    ),
                    ("relevance", Value::Float(0.5)),
                ],
            ),
            node(
                3,
                &["Doc"],
                &[
                    ("title", Value::String("Gamma".to_string())),
                    (
                        "embedding",
                        Value::List(vec![Value::Float(0.8), Value::Float(0.2)]),
                    ),
                    ("relevance", Value::Float(0.9)),
                ],
            ),
        ],
        rels: vec![],
    }
}

fn mock_vector_engine() -> MockVectorEngine {
    mock_vector_engine_with_graph(fixture_vector_graph())
}

fn mock_vector_engine_with_graph(graph: Graph) -> MockVectorEngine {
    let mut engine = MockVectorEngine::new(graph.clone());
    engine.insert_collection(
        "docs",
        graph
            .nodes
            .iter()
            .map(|node| {
                let embedding = match node.props.get("embedding") {
                    Some(Value::List(items)) => items
                        .iter()
                        .map(|item| match item {
                            Value::Float(v) => *v,
                            Value::Int(v) => *v as f64,
                            other => panic!("unexpected embedding item: {other:?}"),
                        })
                        .collect(),
                    other => panic!("unexpected embedding payload: {other:?}"),
                };
                VectorCollectionEntry {
                    node_id: node.id,
                    embedding,
                }
            })
            .collect(),
    );
    engine
}

fn version() -> Version {
    current_plan_version("plexus-engine-test")
}

fn normalize_rows(rows: &[Row]) -> Vec<String> {
    let mut out: Vec<String> = rows.iter().map(|r| format!("{r:?}")).collect();
    out.sort();
    out
}

fn assert_query_equivalent_under_pipeline(cypher: &str, pipeline: &str) {
    let query = parse(cypher).expect("parse");
    let ctx = make_context();

    let unopt_module = lower(&query, &ctx).expect("lower unoptimized");
    let unopt_bytes =
        module_to_flatbuffer(&unopt_module).expect("module_to_flatbuffer unoptimized");
    let mut unopt_engine = InMemoryEngine::new(fixture_graph());
    let unopt_out =
        execute_serialized(&mut unopt_engine, &unopt_bytes).expect("execute unoptimized");

    let mut opt_module = lower(&query, &ctx).expect("lower optimized");
    optimize(&mut opt_module, &ctx, pipeline).expect("optimize");
    let opt_bytes = module_to_flatbuffer(&opt_module).expect("module_to_flatbuffer optimized");
    let mut opt_engine = InMemoryEngine::new(fixture_graph());
    let opt_out = execute_serialized(&mut opt_engine, &opt_bytes).expect("execute optimized");

    assert_eq!(
        normalize_rows(&opt_out.rows),
        normalize_rows(&unopt_out.rows),
        "optimized and unoptimized results differ for query: {cypher}",
    );
}

mod cypher_pipeline;
mod dml_merge;
mod embedded_profile;
mod equivalence;
mod execution_ops;
mod gql_pipeline;
mod independent_consumer;
mod optimizer_equivalence;
mod tck_readonly;
mod test_compiler;
mod vector_conformance;
mod vector_mock;