sqlitegraph 2.2.2

Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library
Documentation
use rusqlite::{Connection, OptionalExtension};

use crate::errors::SqliteGraphError;

pub const BASE_SCHEMA_VERSION: i64 = 1;

struct MigrationStep {
    target_version: i64,
    statements: &'static [&'static str],
}

const MIGRATION_STEPS: &[MigrationStep] = &[
    MigrationStep {
        target_version: 2,
        statements: &[
            "CREATE TABLE IF NOT EXISTS graph_meta_history(version INTEGER NOT NULL, applied_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP)",
            "INSERT INTO graph_meta_history(version) VALUES(2)",
        ],
    },
    MigrationStep {
        target_version: 3,
        statements: &[
            // HNSW index metadata table
            "CREATE TABLE IF NOT EXISTS hnsw_indexes (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL UNIQUE,
                dimension INTEGER NOT NULL,
                m INTEGER NOT NULL,
                ef_construction INTEGER NOT NULL,
                distance_metric TEXT NOT NULL,
                vector_count INTEGER NOT NULL DEFAULT 0,
                created_at INTEGER NOT NULL,
                updated_at INTEGER NOT NULL
            )",
            // HNSW vectors table
            "CREATE TABLE IF NOT EXISTS hnsw_vectors (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                index_id INTEGER NOT NULL,
                vector_data BLOB NOT NULL,
                metadata TEXT,
                created_at INTEGER NOT NULL,
                updated_at INTEGER NOT NULL,
                FOREIGN KEY (index_id) REFERENCES hnsw_indexes(id) ON DELETE CASCADE
            )",
            // HNSW layers table
            "CREATE TABLE IF NOT EXISTS hnsw_layers (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                index_id INTEGER NOT NULL,
                layer_level INTEGER NOT NULL,
                node_id INTEGER NOT NULL,
                connections BLOB NOT NULL,
                FOREIGN KEY (index_id) REFERENCES hnsw_indexes(id) ON DELETE CASCADE,
                UNIQUE(index_id, layer_level, node_id)
            )",
            // HNSW entry points table
            "CREATE TABLE IF NOT EXISTS hnsw_entry_points (
                index_id INTEGER NOT NULL,
                node_id INTEGER NOT NULL,
                PRIMARY KEY (index_id, node_id),
                FOREIGN KEY (index_id) REFERENCES hnsw_indexes(id) ON DELETE CASCADE
            )",
            // Indexes for performance
            "CREATE INDEX IF NOT EXISTS idx_hnsw_vectors_index ON hnsw_vectors(index_id)",
            "CREATE INDEX IF NOT EXISTS idx_hnsw_layers_index ON hnsw_layers(index_id, layer_level)",
            "CREATE INDEX IF NOT EXISTS idx_hnsw_entry_points_index ON hnsw_entry_points(index_id)",
            "INSERT INTO graph_meta_history(version) VALUES(3)",
        ],
    },
    MigrationStep {
        target_version: 4,
        statements: &[
            "CREATE INDEX IF NOT EXISTS idx_entities_kind ON graph_entities(kind)",
            "CREATE INDEX IF NOT EXISTS idx_entities_kind_name ON graph_entities(kind, name)",
            "INSERT INTO graph_meta_history(version) VALUES(4)",
        ],
    },
];

pub const SCHEMA_VERSION: i64 = BASE_SCHEMA_VERSION + MIGRATION_STEPS.len() as i64;

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MigrationReport {
    pub from_version: i64,
    pub to_version: i64,
    pub statements: Vec<&'static str>,
    pub dry_run: bool,
}

pub fn ensure_schema(conn: &Connection) -> Result<(), SqliteGraphError> {
    ensure_base_schema(conn)?;
    ensure_meta(conn)?;
    run_pending_migrations(conn, false)?;
    Ok(())
}

pub fn ensure_schema_without_migrations(conn: &Connection) -> Result<(), SqliteGraphError> {
    ensure_base_schema(conn)?;
    ensure_meta(conn)?;
    Ok(())
}

fn ensure_base_schema(conn: &Connection) -> Result<(), SqliteGraphError> {
    conn.execute_batch(
        r#"
        PRAGMA foreign_keys = ON;
        CREATE TABLE IF NOT EXISTS graph_entities (
            id        INTEGER PRIMARY KEY AUTOINCREMENT,
            kind      TEXT NOT NULL,
            name      TEXT NOT NULL,
            file_path TEXT,
            data      TEXT NOT NULL
        );
        CREATE TABLE IF NOT EXISTS graph_edges (
            id        INTEGER PRIMARY KEY AUTOINCREMENT,
            from_id   INTEGER NOT NULL,
            to_id     INTEGER NOT NULL,
            edge_type TEXT NOT NULL,
            data      TEXT NOT NULL
        );
        CREATE TABLE IF NOT EXISTS graph_labels (
            entity_id INTEGER NOT NULL,
            label     TEXT NOT NULL
        );
        CREATE TABLE IF NOT EXISTS graph_properties (
            entity_id INTEGER NOT NULL,
            key       TEXT NOT NULL,
            value     TEXT NOT NULL
        );
        CREATE INDEX IF NOT EXISTS idx_edges_from ON graph_edges(from_id);
        CREATE INDEX IF NOT EXISTS idx_edges_to ON graph_edges(to_id);
        CREATE INDEX IF NOT EXISTS idx_edges_type ON graph_edges(edge_type);
        CREATE INDEX IF NOT EXISTS idx_labels_label ON graph_labels(label);
        CREATE INDEX IF NOT EXISTS idx_labels_label_entity_id ON graph_labels(label, entity_id);
        CREATE INDEX IF NOT EXISTS idx_props_key_value ON graph_properties(key, value);
        CREATE INDEX IF NOT EXISTS idx_props_key_value_entity_id ON graph_properties(key, value, entity_id);
        CREATE INDEX IF NOT EXISTS idx_entities_kind_id ON graph_entities(kind, id);
        CREATE TABLE IF NOT EXISTS graph_meta (
            id INTEGER PRIMARY KEY CHECK (id = 1),
            schema_version INTEGER NOT NULL
        );
        "#,
    )
    .map_err(|e| SqliteGraphError::schema(e.to_string()))
}

pub fn read_schema_version(conn: &Connection) -> Result<i64, SqliteGraphError> {
    conn.query_row(
        "SELECT schema_version FROM graph_meta WHERE id=1",
        [],
        |row| row.get(0),
    )
    .map_err(|e| SqliteGraphError::schema(e.to_string()))
}

pub fn run_pending_migrations(
    conn: &Connection,
    dry_run: bool,
) -> Result<MigrationReport, SqliteGraphError> {
    let current = read_schema_version(conn)?;
    let mut statements: Vec<&'static str> = Vec::new();
    let mut target = current;
    for step in MIGRATION_STEPS {
        if step.target_version > current {
            target = step.target_version;
            statements.extend_from_slice(step.statements);
        }
    }
    if statements.is_empty() {
        return Ok(MigrationReport {
            from_version: current,
            to_version: current,
            statements,
            dry_run,
        });
    }
    if dry_run {
        return Ok(MigrationReport {
            from_version: current,
            to_version: target,
            statements,
            dry_run,
        });
    }
    conn.execute("BEGIN IMMEDIATE", [])
        .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
    let result: Result<(), SqliteGraphError> = (|| {
        for sql in statements.iter().copied() {
            conn.execute(sql, [])
                .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
        }
        conn.execute(
            "UPDATE graph_meta SET schema_version=?1 WHERE id=1",
            [target],
        )
        .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
        Ok(())
    })();
    match result {
        Ok(()) => {
            conn.execute("COMMIT", [])
                .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
        }
        Err(err) => {
            let _ = conn.execute("ROLLBACK", []);
            return Err(err);
        }
    }
    Ok(MigrationReport {
        from_version: current,
        to_version: target,
        statements,
        dry_run,
    })
}

fn ensure_meta(conn: &Connection) -> Result<(), SqliteGraphError> {
    let version: Option<i64> = conn
        .query_row(
            "SELECT schema_version FROM graph_meta WHERE id=1",
            [],
            |row| row.get(0),
        )
        .optional()
        .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
    match version {
        Some(existing) => {
            if existing > SCHEMA_VERSION {
                return Err(SqliteGraphError::schema(format!(
                    "database schema version {existing} is newer than supported {SCHEMA_VERSION}"
                )));
            }
            if existing < BASE_SCHEMA_VERSION {
                conn.execute(
                    "UPDATE graph_meta SET schema_version=?1 WHERE id=1",
                    [BASE_SCHEMA_VERSION],
                )
                .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
            }
        }
        None => {
            conn.execute(
                "INSERT INTO graph_meta(id, schema_version) VALUES(1, ?1)",
                [BASE_SCHEMA_VERSION],
            )
            .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
        }
    }
    Ok(())
}