kglite 0.10.19

Pure-Rust knowledge graph engine — Cypher pipeline, snapshot/working CoW transactions, columnar/mmap/disk storage backends, optional dataset loaders (SEC EDGAR, Sodir, Wikidata). PyO3 wrappers live in the sibling kglite-py crate (the Python wheel); embeddable directly from any Rust binary without PyO3 in the dep tree.
Documentation
//! Cypher pipeline orchestration — single source of truth.
//!
//! Mirrors the canonical pipeline that previously lived inline at
//! `src/graph/pyapi/kg_core.rs::cypher`:
//!
//! ```text
//! parse_cypher → validate_schema → rewrite_text_score (+embed if needed)
//!   → optimize_with_disabled → [mark_lazy_eligibility] → is_mutation_query
//!   → generate_explain_result | execute | execute_mutable
//! ```
//!
//! [`execute_read`] takes `&DirGraph` (auto-commit reads + in-tx reads
//! against working/snapshot). [`execute_mut`] takes `&mut DirGraph`
//! (in-tx writes against `Transaction::working_mut()`).

use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Instant;

use crate::datatypes::Value;
use crate::error::KgError;
use crate::graph::dir_graph::DirGraph;
use crate::graph::embedder::Embedder;
use crate::graph::languages::cypher;
use crate::graph::languages::cypher::ast::{CypherQuery, OutputFormat};
use crate::graph::languages::cypher::result::CypherResult;

/// Per-query knobs. Borrowed for the duration of one execute call.
/// Default values match the kg_core.rs Python boundary's defaults
/// (lazy_eligible=true, no deadline, no max_rows, no disabled passes,
/// no embedder).
pub struct ExecuteOptions<'a> {
    /// Parameter bindings (`$x` references). Empty map = no params.
    pub params: &'a HashMap<String, Value>,
    /// Optional execution deadline. Past this, the executor returns
    /// `CypherTimeout`. None = no deadline.
    pub deadline: Option<Instant>,
    /// Optional row cap. None = no cap.
    pub max_rows: Option<usize>,
    /// Lazy-projection mode.
    ///
    /// - `true` (Python default): call `mark_lazy_eligibility` after
    ///   optimize + pass `streaming=true` to the executor. The
    ///   `CypherResult.lazy` field may be `Some(LazyResultDescriptor)`;
    ///   callers that want eager rows must materialize via the lazy
    ///   helper in `src/graph/pyapi/result_view.rs`.
    /// - `false` (bolt-server, mcp-server): skip
    ///   `mark_lazy_eligibility` + pass `streaming=false`. The
    ///   executor materializes every row into `CypherResult.rows`.
    ///
    /// **Important:** setting `lazy_eligible=true` without having a
    /// lazy-materializer to consume `result.lazy` results in
    /// silently empty row sets — exactly the bolt-server bug fixed
    /// during the robustness pass. Default to `false` for safety;
    /// the Python boundary flips it to `true` to benefit from the
    /// lazy path in interactive use.
    pub lazy_eligible: bool,
    /// Optional set of planner passes to disable. None means "use
    /// the static empty set" (no allocation; the common case).
    pub disabled_passes: Option<&'a HashSet<String>>,
    /// Optional embedder for `text_score()` queries. If a query
    /// uses `text_score()` and this is `None`, execute returns
    /// `KgError::Argument("text_score requires embedder ...")`.
    pub embedder: Option<Arc<dyn Embedder>>,
}

impl<'a> ExecuteOptions<'a> {
    /// Conservative defaults: `lazy_eligible: false` (safe for
    /// every consumer that doesn't have a lazy materializer), no
    /// deadline, no max_rows, no disabled passes, no embedder.
    /// Caller is expected to override at least `params`.
    ///
    /// Same as [`Self::eager`] — the two are synonyms. `new` is
    /// kept for Rust-convention API discovery; `eager` is the
    /// intent-named factory call-sites prefer.
    pub fn new(params: &'a HashMap<String, Value>) -> Self {
        Self::eager(params)
    }

    /// Eager-execution defaults — the safe default for any binding
    /// that doesn't have a lazy result materializer.
    ///
    /// This is the constructor non-Python bindings should reach for:
    /// `lazy_eligible: false`, no deadline, no max_rows, no disabled
    /// passes, no embedder. Override individual fields after
    /// construction if needed (deadline for timeouts, embedder when
    /// `text_score()` queries are expected).
    ///
    /// Lifted in 2026-05-25 to give the call-site the intent-named
    /// shape — previously mcp-server / bolt-server constructed the
    /// struct manually with identical defaults; now they call
    /// `ExecuteOptions::eager(params)` for self-documenting code.
    pub fn eager(params: &'a HashMap<String, Value>) -> Self {
        Self {
            params,
            deadline: None,
            max_rows: None,
            lazy_eligible: false,
            disabled_passes: None,
            embedder: None,
        }
    }
}

/// Result of a successful execute. Wraps `CypherResult` with the
/// metadata callers need for output serialization (CSV, DataFrame,
/// PackStream record emission).
pub struct ExecuteOutcome {
    pub result: CypherResult,
    /// `true` when the query was a CREATE/SET/DELETE/REMOVE/MERGE.
    /// Read-only callers can pre-reject by checking this on a
    /// dry-run; in practice `execute_read` rejects mutations
    /// upfront via `KgError::Argument`.
    pub is_mutation: bool,
    /// Set when the user passes `RETURN ... FORMAT CSV` (kglite
    /// extension); pyapi + mcp-server format the result accordingly.
    pub output_format: OutputFormat,
    /// Set when the user prefixed the query with `EXPLAIN`. The
    /// `result` contains the rendered plan rows rather than real
    /// data; callers may want to format / display differently.
    pub explain: bool,
}

/// Read-only execution. Errors if the query mutates.
///
/// Caller responsibilities:
/// - Provide a `&DirGraph` (snapshot for auto-commit, or
///   `tx.current()` for in-tx reads).
/// - Decode params (`Bolt`/`Py` → `Value`) before calling.
/// - Map the returned `KgError` to the binding's error type
///   (PyErr subclass via `From`, `BoltError` via the
///   `kg_to_bolt`/`string_to_bolt` helpers in bolt-server).
pub fn execute_read(
    graph: &DirGraph,
    query: &str,
    opts: &ExecuteOptions<'_>,
) -> Result<ExecuteOutcome, KgError> {
    let (parsed, params) = prepare(graph, query, opts)?;
    let is_mutation = cypher::is_mutation_query(&parsed);

    // EXPLAIN: render plan rows, skip execution.
    if parsed.explain {
        let result = cypher::generate_explain_result(&parsed, graph);
        return Ok(ExecuteOutcome {
            result,
            is_mutation,
            output_format: parsed.output_format,
            explain: true,
        });
    }

    if is_mutation {
        return Err(KgError::Argument(
            "execute_read called with a mutation query (CREATE/SET/DELETE/REMOVE/MERGE) \
             — use execute_mut against a mutable graph view"
                .to_string(),
        ));
    }

    let result = cypher::CypherExecutor::with_params(graph, &params, opts.deadline)
        .with_max_rows(opts.max_rows)
        .with_streaming(opts.lazy_eligible)
        .execute(&parsed)
        .map_err(|message| KgError::CypherExecution {
            message,
            position: None,
        })?;

    Ok(ExecuteOutcome {
        result,
        is_mutation: false,
        output_format: parsed.output_format,
        explain: false,
    })
}

/// Mutating execution. Caller passes `&mut DirGraph` (typically
/// from `Transaction::working_mut()`). For pure reads, use
/// [`execute_read`] instead.
///
/// Note: a read query passed to `execute_mut` runs against the
/// mutable graph view as a read. The function returns
/// `is_mutation: false` in that case so the caller knows nothing
/// was changed.
pub fn execute_mut(
    graph: &mut DirGraph,
    query: &str,
    opts: &ExecuteOptions<'_>,
) -> Result<ExecuteOutcome, KgError> {
    let (parsed, params) = prepare(graph, query, opts)?;
    let is_mutation = cypher::is_mutation_query(&parsed);

    if parsed.explain {
        let result = cypher::generate_explain_result(&parsed, graph);
        return Ok(ExecuteOutcome {
            result,
            is_mutation,
            output_format: parsed.output_format,
            explain: true,
        });
    }

    let result = if is_mutation {
        cypher::execute_mutable(graph, &parsed, params, opts.deadline).map_err(|message| {
            KgError::CypherExecution {
                message,
                position: None,
            }
        })?
    } else {
        cypher::CypherExecutor::with_params(graph, &params, opts.deadline)
            .with_max_rows(opts.max_rows)
            .with_streaming(opts.lazy_eligible)
            .execute(&parsed)
            .map_err(|message| KgError::CypherExecution {
                message,
                position: None,
            })?
    };

    Ok(ExecuteOutcome {
        result,
        is_mutation,
        output_format: parsed.output_format,
        explain: false,
    })
}

/// Shared preparation: parse → validate → rewrite_text_score → embed
/// (if needed) → optimize → optional mark_lazy. Returns the
/// parsed+optimized AST + the (possibly-augmented-with-embeddings)
/// param map.
///
/// The params map is borrowed from `opts.params` in the common case
/// (no text_score). When text_score() is present, we clone-on-write
/// to inject the embedding result vectors into the map — the
/// returned `HashMap<String, Value>` is owned in that case.
///
/// **GIL note for binding implementers.** If `opts.embedder` is a
/// Python-backed embedder (PyEmbedderAdapter), the binding MUST
/// release the GIL before calling `execute_read`/`execute_mut`
/// (Python's `py.detach`). The embed call inside this fn will then
/// re-acquire the GIL briefly to invoke Python; if you forget to
/// release first, it deadlocks.
fn prepare(
    graph: &DirGraph,
    query: &str,
    opts: &ExecuteOptions<'_>,
) -> Result<(CypherQuery, HashMap<String, Value>), KgError> {
    let mut parsed = cypher::parse_cypher(query)?;

    // Schema validation — property typos in pattern literals
    // (`{ttle: 'Alice'}`) get caught with a "did you mean?" hint.
    cypher::validate_schema(&parsed, graph).map_err(KgError::from)?;

    // Non-fatal: warn (stderr) when a MATCH references an unknown node label
    // or relationship type — the most common "why is my query empty?" typo.
    cypher::warn_unknown_pattern_refs(&parsed, graph);

    // text_score() rewrite. Scans for `text_score(...)` calls in the
    // AST and rewrites them to `vector_score(...)`, collecting the
    // texts to embed alongside.
    let rewrite = cypher::rewrite_text_score(&mut parsed, opts.params).map_err(|message| {
        KgError::CypherExecution {
            message,
            position: None,
        }
    })?;

    // If text_score(...) was used (and we're NOT in EXPLAIN mode —
    // EXPLAIN renders plan rows without executing, so no embedding
    // needed), run the embedder and inject the result vectors into
    // the param map. Otherwise pass the caller's params through.
    let params: Cow<'_, HashMap<String, Value>> =
        if !rewrite.texts_to_embed.is_empty() && !parsed.explain {
            Cow::Owned(embed_into_params(opts, &rewrite)?)
        } else {
            Cow::Borrowed(opts.params)
        };

    // Optimize. Empty disabled-set is the common case; avoid the
    // HashSet allocation when no passes are disabled.
    let disabled_default = cypher::planner::empty_disabled_set();
    let disabled_ref = opts.disabled_passes.unwrap_or(disabled_default);
    cypher::planner::optimize_with_disabled(&mut parsed, graph, &params, disabled_ref);

    // Lazy marking — only when the caller asked for it. Without
    // this call, the executor materializes rows eagerly. With it,
    // `result.lazy` may be Some and `result.rows` empty; the
    // caller must handle materialization (Python's ResultView
    // does; bolt-server doesn't, so it passes `lazy_eligible: false`).
    if opts.lazy_eligible {
        cypher::mark_lazy_eligibility(&mut parsed);
    }

    Ok((parsed, params.into_owned()))
}

/// Run the embedder on collected texts; inject the JSON-encoded
/// vectors into a clone of the param map. Caller-supplied params
/// are not mutated. Returns the augmented map.
fn embed_into_params(
    opts: &ExecuteOptions<'_>,
    rewrite: &cypher::planner::simplification::TextScoreRewrite,
) -> Result<HashMap<String, Value>, KgError> {
    let model = opts
        .embedder
        .as_ref()
        .ok_or_else(|| KgError::CypherExecution {
            message: "text_score() requires a registered embedding model. \
                      Call g.set_embedder(model) first (Python) or pass an embedder \
                      via ExecuteOptions::embedder (downstream Rust consumers)."
                .to_string(),
            position: None,
        })?;
    model.load().map_err(|message| KgError::CypherExecution {
        message,
        position: None,
    })?;
    let texts: Vec<String> = rewrite
        .texts_to_embed
        .iter()
        .map(|(_, t)| t.clone())
        .collect();
    let embed_result = model.embed(&texts);
    model.unload();
    let embeddings: Vec<Vec<f32>> = embed_result.map_err(|message| KgError::CypherExecution {
        message,
        position: None,
    })?;
    if embeddings.len() != texts.len() {
        return Err(KgError::CypherExecution {
            message: format!(
                "text_score: model.embed() returned {} vectors for {} texts",
                embeddings.len(),
                texts.len()
            ),
            position: None,
        });
    }
    let mut params = opts.params.clone();
    for (i, (param_name, _)) in rewrite.texts_to_embed.iter().enumerate() {
        let json = format!(
            "[{}]",
            embeddings[i]
                .iter()
                .map(|f| f.to_string())
                .collect::<Vec<_>>()
                .join(", ")
        );
        params.insert(param_name.clone(), Value::String(json));
    }
    Ok(params)
}