grapha 0.4.0

Blazingly fast code intelligence CLI and MCP server for Swift and Rust
Documentation
use std::path::{Path, PathBuf};

use rusqlite::Connection;

use crate::delta::{GraphDelta, edge_fingerprint};
use crate::store::{StoreWriteStats, sqlite::schema};
use grapha_core::graph::{Edge, Node};

use super::{
    SqliteStore, compat::serialize_provenance, edge_kind_str, flow_direction_str, node_kind_str,
    visibility_str,
};

pub(super) fn remove_existing_store_files(path: &Path) -> anyhow::Result<()> {
    for candidate in [
        path.to_path_buf(),
        PathBuf::from(format!("{}-wal", path.to_string_lossy())),
        PathBuf::from(format!("{}-shm", path.to_string_lossy())),
    ] {
        match std::fs::remove_file(&candidate) {
            Ok(()) => {}
            Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
            Err(error) => return Err(error.into()),
        }
    }
    Ok(())
}

pub(super) fn insert_nodes(
    tx: &rusqlite::Transaction<'_>,
    nodes: &[Node],
    replace: bool,
) -> anyhow::Result<()> {
    let verb = if replace {
        "INSERT OR REPLACE"
    } else {
        "INSERT"
    };
    let sql = format!(
        "{verb} INTO nodes (id, kind, name, file,
            span_start_line, span_start_col, span_end_line, span_end_col,
            visibility, metadata, role, signature, doc_comment, module, snippet, repo)
         VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16)"
    );
    let mut stmt = tx.prepare_cached(&sql)?;
    let empty_meta = "{}".to_string();
    let mut meta_buf = String::new();
    for node in nodes {
        let role_json: Option<String> =
            node.role.as_ref().map(serde_json::to_string).transpose()?;
        let meta_ref: &str = if node.metadata.is_empty() {
            &empty_meta
        } else {
            meta_buf.clear();
            serde_json::to_writer(unsafe { meta_buf.as_mut_vec() }, &node.metadata)?;
            &meta_buf
        };
        let file_str = node.file.to_string_lossy();
        stmt.execute(rusqlite::params![
            node.id,
            node_kind_str(&node.kind),
            node.name,
            file_str.as_ref(),
            node.span.start[0] as i64,
            node.span.start[1] as i64,
            node.span.end[0] as i64,
            node.span.end[1] as i64,
            visibility_str(&node.visibility),
            meta_ref,
            role_json,
            node.signature,
            node.doc_comment,
            node.module,
            node.snippet,
            node.repo,
        ])?;
    }
    Ok(())
}

pub(super) fn insert_edges<'a>(
    tx: &rusqlite::Transaction<'_>,
    edges: impl Iterator<Item = (String, &'a Edge)>,
    replace: bool,
) -> anyhow::Result<()> {
    let verb = if replace {
        "INSERT OR REPLACE"
    } else {
        "INSERT"
    };
    let sql = format!(
        "{verb} INTO edges (edge_id, source, target, kind, confidence,
            direction, operation, condition, async_boundary, provenance, repo)
         VALUES (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11)"
    );
    let mut stmt = tx.prepare_cached(&sql)?;
    for (edge_id, edge) in edges {
        let direction_str = edge.direction.as_ref().map(flow_direction_str);
        let async_boundary_int = edge.async_boundary.map(|b| if b { 1 } else { 0 });
        let provenance = serialize_provenance(&edge.provenance)?;
        stmt.execute(rusqlite::params![
            edge_id,
            edge.source,
            edge.target,
            edge_kind_str(&edge.kind),
            edge.confidence,
            direction_str,
            edge.operation,
            edge.condition,
            async_boundary_int,
            provenance,
            edge.repo,
        ])?;
    }
    Ok(())
}

pub(super) fn save_full(
    store: &SqliteStore,
    graph: &grapha_core::graph::Graph,
) -> anyhow::Result<()> {
    remove_existing_store_files(&store.path)?;
    let conn = Connection::open(&store.path)?;
    conn.execute_batch(
        "PRAGMA journal_mode=OFF;
         PRAGMA synchronous=OFF;
         PRAGMA temp_store=MEMORY;
         PRAGMA cache_size=-64000;
         PRAGMA mmap_size=268435456;
         PRAGMA locking_mode=EXCLUSIVE;
         PRAGMA page_size=8192;",
    )?;

    schema::drop_tables(&conn)?;
    schema::create_bulk_load_tables(&conn)?;

    let tx = conn.unchecked_transaction()?;
    schema::write_meta(&tx, graph)?;
    insert_nodes(&tx, &graph.nodes, false)?;
    insert_edges(
        &tx,
        graph
            .edges
            .iter()
            .map(|edge| (edge_fingerprint(edge), edge)),
        false,
    )?;
    tx.commit()?;
    schema::create_bulk_load_indexes(&conn)?;
    conn.execute_batch("PRAGMA optimize;")?;
    Ok(())
}

pub(super) fn save_incremental(
    store: &SqliteStore,
    previous: Option<&grapha_core::graph::Graph>,
    graph: &grapha_core::graph::Graph,
) -> anyhow::Result<StoreWriteStats> {
    let conn = store.open_for_write()?;
    let schema_version = schema::schema_version(&conn)?;
    if previous.is_none() || schema_version.as_deref() != Some(schema::STORE_SCHEMA_VERSION) {
        let full_stats =
            StoreWriteStats::from_graphs(previous, graph, crate::delta::SyncMode::FullRebuild);
        drop(conn);
        save_full(store, graph)?;
        return Ok(full_stats);
    }

    let previous_graph = previous.expect("checked is_some above");
    let delta = GraphDelta::between(previous_graph, graph);

    let stats = StoreWriteStats {
        mode: crate::delta::SyncMode::Incremental,
        nodes: delta.node_stats(),
        edges: delta.edge_stats(),
    };

    if delta.is_empty() {
        return Ok(stats);
    }

    let tx = conn.unchecked_transaction()?;
    schema::write_meta(&tx, graph)?;

    {
        let mut delete_edges = tx.prepare("DELETE FROM edges WHERE edge_id = ?1")?;
        for edge_id in &delta.deleted_edge_ids {
            delete_edges.execute([edge_id])?;
        }
    }

    {
        let mut delete_nodes = tx.prepare("DELETE FROM nodes WHERE id = ?1")?;
        for node_id in &delta.deleted_node_ids {
            delete_nodes.execute([node_id])?;
        }
    }

    let mut changed_nodes = Vec::new();
    changed_nodes.extend(delta.added_nodes.iter().copied().cloned());
    changed_nodes.extend(delta.updated_nodes.iter().copied().cloned());
    insert_nodes(&tx, &changed_nodes, true)?;

    {
        let mut delete_edges = tx.prepare("DELETE FROM edges WHERE edge_id = ?1")?;
        for edge in &delta.updated_edges {
            delete_edges.execute([&edge.id])?;
        }
    }
    insert_edges(
        &tx,
        delta
            .added_edges
            .iter()
            .chain(delta.updated_edges.iter())
            .map(|edge| (edge.id.clone(), edge.edge)),
        true,
    )?;
    tx.commit()?;
    conn.execute_batch("PRAGMA optimize;")?;

    Ok(stats)
}