iridium-db 0.2.0

A high-performance vector-graph hybrid storage and indexing engine
use std::collections::HashSet;

use crate::features::storage::api as storage_api;
use plexus_serde::ExpandDir;

use super::super::super::{execution, ExplainError, Result, Row, RowStream};
use super::aggregate::apply_aggregate;
use super::types::{apply_post_ops, AdapterExecutionPlan, ExpandKind, ExpandSpec};
use crate::features::runtime::api::ExecuteParams;

pub(super) fn execute_adapter_plan(
    plan: &AdapterExecutionPlan,
    params: &ExecuteParams,
    handle: &mut storage_api::StorageHandle,
) -> Result<RowStream> {
    if let Some(expand_spec) = plan.expand {
        return execute_with_expand(plan, params, handle, expand_spec);
    }
    let mut stream = execution::execute(&plan.explain_plan, params, handle)?;
    if let Some(values) = &plan.unwind_values {
        stream.rows = execute_unwind_values(handle, &stream.rows, values)?;
    }
    if let Some(aggregate) = plan.aggregate {
        stream.rows = apply_aggregate(stream.rows, aggregate);
    }
    if plan.path_construct {
        // Current row contract has no explicit path value slot; preserve rows.
    }
    apply_post_ops(&mut stream, plan);
    Ok(stream)
}

fn execute_with_expand(
    plan: &AdapterExecutionPlan,
    params: &ExecuteParams,
    handle: &mut storage_api::StorageHandle,
    expand_spec: ExpandSpec,
) -> Result<RowStream> {
    if expand_spec.dir != ExpandDir::Out {
        return Err(ExplainError::UnsupportedSerializedOperator(
            "Expand execution currently supports only Out direction".to_string(),
        ));
    }
    let source_stream = execution::execute(&plan.explain_plan, params, handle)?;
    let source_rows = if let Some(values) = &plan.unwind_values {
        execute_unwind_values(handle, &source_stream.rows, values)?
    } else {
        source_stream.rows.clone()
    };
    let mut rows = Vec::new();

    for source in &source_rows {
        let logical = storage_api::get_logical_node(handle, source.node_id).map_err(|error| {
            ExplainError::InvalidPlan(format!("storage read failed: {:?}", error))
        })?;
        let neighbors = logical.adjacency();
        match expand_spec.kind {
            ExpandKind::Expand => {
                for neighbor_id in neighbors {
                    rows.push(neighbor_row(handle, neighbor_id)?);
                }
            }
            ExpandKind::OptionalExpand => {
                if neighbors.is_empty() {
                    rows.push(source.clone());
                } else {
                    for neighbor_id in neighbors {
                        rows.push(neighbor_row(handle, neighbor_id)?);
                    }
                }
            }
            ExpandKind::SemiExpand => {
                if !neighbors.is_empty() {
                    rows.push(source.clone());
                }
            }
            ExpandKind::ExpandVarLen { min_hops, max_hops } => {
                let mut frontier = vec![source.node_id];
                let mut seen = HashSet::new();
                for hop in 1..=max_hops {
                    let mut next = Vec::new();
                    for node_id in frontier {
                        let logical =
                            storage_api::get_logical_node(handle, node_id).map_err(|error| {
                                ExplainError::InvalidPlan(format!(
                                    "storage read failed: {:?}",
                                    error
                                ))
                            })?;
                        for neighbor in logical.adjacency() {
                            next.push(neighbor);
                            if hop >= min_hops && seen.insert(neighbor) {
                                rows.push(neighbor_row(handle, neighbor)?);
                            }
                        }
                    }
                    frontier = next;
                    if frontier.is_empty() {
                        break;
                    }
                }
            }
        }
    }

    let mut stream = RowStream {
        rows,
        scanned_nodes: source_stream.scanned_nodes,
        latency_micros: source_stream.latency_micros,
        morsels_processed: source_stream.morsels_processed,
        rerank_batches: source_stream.rerank_batches,
        parallel_workers: source_stream.parallel_workers,
    };
    if let Some(aggregate) = plan.aggregate {
        stream.rows = apply_aggregate(stream.rows, aggregate);
    }
    apply_post_ops(&mut stream, plan);
    Ok(stream)
}

pub(super) fn execute_unwind_values(
    handle: &mut storage_api::StorageHandle,
    source_rows: &[Row],
    values: &[u64],
) -> Result<Vec<Row>> {
    let mut out = Vec::new();
    for _source in source_rows {
        for value in values {
            out.push(neighbor_row(handle, *value)?);
        }
    }
    Ok(out)
}

fn neighbor_row(handle: &mut storage_api::StorageHandle, node_id: u64) -> Result<Row> {
    let summary = storage_api::get_node_row_summary(handle, node_id)
        .map_err(|error| ExplainError::InvalidPlan(format!("storage read failed: {:?}", error)))?;
    Ok(match summary {
        Some(summary) => Row {
            node_id,
            has_full: summary.has_full,
            delta_count: summary.delta_count,
            adjacency_degree: summary.adjacency_degree,
            score: None,
            aggregate_value: None,
        },
        None => Row {
            node_id,
            has_full: false,
            delta_count: 0,
            adjacency_degree: 0,
            score: None,
            aggregate_value: None,
        },
    })
}