Skip to main content

sqlitegraph/
schema.rs

1use rusqlite::{Connection, OptionalExtension};
2
3use crate::errors::SqliteGraphError;
4
5pub const BASE_SCHEMA_VERSION: i64 = 1;
6
7struct MigrationStep {
8    target_version: i64,
9    statements: &'static [&'static str],
10}
11
12const MIGRATION_STEPS: &[MigrationStep] = &[
13    MigrationStep {
14        target_version: 2,
15        statements: &[
16            "CREATE TABLE IF NOT EXISTS graph_meta_history(version INTEGER NOT NULL, applied_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP)",
17            "INSERT INTO graph_meta_history(version) VALUES(2)",
18        ],
19    },
20    MigrationStep {
21        target_version: 3,
22        statements: &[
23            // HNSW index metadata table
24            "CREATE TABLE IF NOT EXISTS hnsw_indexes (
25                id INTEGER PRIMARY KEY AUTOINCREMENT,
26                name TEXT NOT NULL UNIQUE,
27                dimension INTEGER NOT NULL,
28                m INTEGER NOT NULL,
29                ef_construction INTEGER NOT NULL,
30                distance_metric TEXT NOT NULL,
31                vector_count INTEGER NOT NULL DEFAULT 0,
32                created_at INTEGER NOT NULL,
33                updated_at INTEGER NOT NULL
34            )",
35            // HNSW vectors table
36            "CREATE TABLE IF NOT EXISTS hnsw_vectors (
37                id INTEGER PRIMARY KEY AUTOINCREMENT,
38                index_id INTEGER NOT NULL,
39                vector_data BLOB NOT NULL,
40                metadata TEXT,
41                created_at INTEGER NOT NULL,
42                updated_at INTEGER NOT NULL,
43                FOREIGN KEY (index_id) REFERENCES hnsw_indexes(id) ON DELETE CASCADE
44            )",
45            // HNSW layers table
46            "CREATE TABLE IF NOT EXISTS hnsw_layers (
47                id INTEGER PRIMARY KEY AUTOINCREMENT,
48                index_id INTEGER NOT NULL,
49                layer_level INTEGER NOT NULL,
50                node_id INTEGER NOT NULL,
51                connections BLOB NOT NULL,
52                FOREIGN KEY (index_id) REFERENCES hnsw_indexes(id) ON DELETE CASCADE,
53                UNIQUE(index_id, layer_level, node_id)
54            )",
55            // HNSW entry points table
56            "CREATE TABLE IF NOT EXISTS hnsw_entry_points (
57                index_id INTEGER NOT NULL,
58                node_id INTEGER NOT NULL,
59                PRIMARY KEY (index_id, node_id),
60                FOREIGN KEY (index_id) REFERENCES hnsw_indexes(id) ON DELETE CASCADE
61            )",
62            // Indexes for performance
63            "CREATE INDEX IF NOT EXISTS idx_hnsw_vectors_index ON hnsw_vectors(index_id)",
64            "CREATE INDEX IF NOT EXISTS idx_hnsw_layers_index ON hnsw_layers(index_id, layer_level)",
65            "CREATE INDEX IF NOT EXISTS idx_hnsw_entry_points_index ON hnsw_entry_points(index_id)",
66            "INSERT INTO graph_meta_history(version) VALUES(3)",
67        ],
68    },
69];
70
71pub const SCHEMA_VERSION: i64 = BASE_SCHEMA_VERSION + MIGRATION_STEPS.len() as i64;
72
73#[derive(Debug, Clone, PartialEq, Eq)]
74pub struct MigrationReport {
75    pub from_version: i64,
76    pub to_version: i64,
77    pub statements: Vec<&'static str>,
78    pub dry_run: bool,
79}
80
81pub fn ensure_schema(conn: &Connection) -> Result<(), SqliteGraphError> {
82    ensure_base_schema(conn)?;
83    ensure_meta(conn)?;
84    run_pending_migrations(conn, false)?;
85    Ok(())
86}
87
88pub fn ensure_schema_without_migrations(conn: &Connection) -> Result<(), SqliteGraphError> {
89    ensure_base_schema(conn)?;
90    ensure_meta(conn)?;
91    Ok(())
92}
93
94fn ensure_base_schema(conn: &Connection) -> Result<(), SqliteGraphError> {
95    conn.execute_batch(
96        r#"
97        PRAGMA foreign_keys = ON;
98        CREATE TABLE IF NOT EXISTS graph_entities (
99            id        INTEGER PRIMARY KEY AUTOINCREMENT,
100            kind      TEXT NOT NULL,
101            name      TEXT NOT NULL,
102            file_path TEXT,
103            data      TEXT NOT NULL
104        );
105        CREATE TABLE IF NOT EXISTS graph_edges (
106            id        INTEGER PRIMARY KEY AUTOINCREMENT,
107            from_id   INTEGER NOT NULL,
108            to_id     INTEGER NOT NULL,
109            edge_type TEXT NOT NULL,
110            data      TEXT NOT NULL
111        );
112        CREATE TABLE IF NOT EXISTS graph_labels (
113            entity_id INTEGER NOT NULL,
114            label     TEXT NOT NULL
115        );
116        CREATE TABLE IF NOT EXISTS graph_properties (
117            entity_id INTEGER NOT NULL,
118            key       TEXT NOT NULL,
119            value     TEXT NOT NULL
120        );
121        CREATE INDEX IF NOT EXISTS idx_edges_from ON graph_edges(from_id);
122        CREATE INDEX IF NOT EXISTS idx_edges_to ON graph_edges(to_id);
123        CREATE INDEX IF NOT EXISTS idx_edges_type ON graph_edges(edge_type);
124        CREATE INDEX IF NOT EXISTS idx_labels_label ON graph_labels(label);
125        CREATE INDEX IF NOT EXISTS idx_labels_label_entity_id ON graph_labels(label, entity_id);
126        CREATE INDEX IF NOT EXISTS idx_props_key_value ON graph_properties(key, value);
127        CREATE INDEX IF NOT EXISTS idx_props_key_value_entity_id ON graph_properties(key, value, entity_id);
128        CREATE INDEX IF NOT EXISTS idx_entities_kind_id ON graph_entities(kind, id);
129        CREATE TABLE IF NOT EXISTS graph_meta (
130            id INTEGER PRIMARY KEY CHECK (id = 1),
131            schema_version INTEGER NOT NULL
132        );
133        "#,
134    )
135    .map_err(|e| SqliteGraphError::schema(e.to_string()))
136}
137
138pub fn read_schema_version(conn: &Connection) -> Result<i64, SqliteGraphError> {
139    conn.query_row(
140        "SELECT schema_version FROM graph_meta WHERE id=1",
141        [],
142        |row| row.get(0),
143    )
144    .map_err(|e| SqliteGraphError::schema(e.to_string()))
145}
146
147pub fn run_pending_migrations(
148    conn: &Connection,
149    dry_run: bool,
150) -> Result<MigrationReport, SqliteGraphError> {
151    let current = read_schema_version(conn)?;
152    let mut statements: Vec<&'static str> = Vec::new();
153    let mut target = current;
154    for step in MIGRATION_STEPS {
155        if step.target_version > current {
156            target = step.target_version;
157            statements.extend_from_slice(step.statements);
158        }
159    }
160    if statements.is_empty() {
161        return Ok(MigrationReport {
162            from_version: current,
163            to_version: current,
164            statements,
165            dry_run,
166        });
167    }
168    if dry_run {
169        return Ok(MigrationReport {
170            from_version: current,
171            to_version: target,
172            statements,
173            dry_run,
174        });
175    }
176    conn.execute("BEGIN IMMEDIATE", [])
177        .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
178    let result: Result<(), SqliteGraphError> = (|| {
179        for sql in statements.iter().copied() {
180            conn.execute(sql, [])
181                .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
182        }
183        conn.execute(
184            "UPDATE graph_meta SET schema_version=?1 WHERE id=1",
185            [target],
186        )
187        .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
188        Ok(())
189    })();
190    match result {
191        Ok(()) => {
192            conn.execute("COMMIT", [])
193                .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
194        }
195        Err(err) => {
196            let _ = conn.execute("ROLLBACK", []);
197            return Err(err);
198        }
199    }
200    Ok(MigrationReport {
201        from_version: current,
202        to_version: target,
203        statements,
204        dry_run,
205    })
206}
207
208fn ensure_meta(conn: &Connection) -> Result<(), SqliteGraphError> {
209    let version: Option<i64> = conn
210        .query_row(
211            "SELECT schema_version FROM graph_meta WHERE id=1",
212            [],
213            |row| row.get(0),
214        )
215        .optional()
216        .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
217    match version {
218        Some(existing) => {
219            if existing > SCHEMA_VERSION {
220                return Err(SqliteGraphError::schema(format!(
221                    "database schema version {existing} is newer than supported {SCHEMA_VERSION}"
222                )));
223            }
224            if existing < BASE_SCHEMA_VERSION {
225                conn.execute(
226                    "UPDATE graph_meta SET schema_version=?1 WHERE id=1",
227                    [BASE_SCHEMA_VERSION],
228                )
229                .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
230            }
231        }
232        None => {
233            conn.execute(
234                "INSERT INTO graph_meta(id, schema_version) VALUES(1, ?1)",
235                [BASE_SCHEMA_VERSION],
236            )
237            .map_err(|e| SqliteGraphError::schema(e.to_string()))?;
238        }
239    }
240    Ok(())
241}