llkv-sql 0.8.5-alpha

SQL interface for the LLKV toolkit.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
//! SQL-level MVCC persistence integration tests.
//!
//! These tests verify that MVCC (Multi-Version Concurrency Control) transaction state
//! persists correctly across database reopens when using durable pagers. Unlike the
//! low-level MVCC tests in llkv-transaction, these tests use SQL statements (BEGIN,
//! COMMIT, ROLLBACK, INSERT, SELECT, DELETE) to verify persistence behavior.
//!
//! ## What is tested
//!
//! - Committed transactions persist and remain visible after database reopen
//! - Uncommitted transactions are lost and their data is invisible after reopen
//! - Transaction IDs don't overlap across sessions (no reuse)
//! - Soft deletes (DELETE statements) persist correctly
//! - Multiple committed transactions work correctly
//!
//! ## Implementation
//!
//! Each test:
//! 1. Creates a temporary file-backed database using SimdRDrivePager
//! 2. Executes SQL statements in a transaction (BEGIN...COMMIT or ROLLBACK)
//! 3. Closes the database (drops Context and SqlEngine)
//! 4. Reopens the same database file with a fresh Context
//! 5. Verifies data visibility matches expected MVCC behavior
//!
//! The persistence is handled automatically by:
//! - Context loading transaction state from catalog on initialization
//! - Session persisting transaction state to catalog on commit
//! - No manual MVCC layer manipulation required

use arrow::array::{Array, Int64Array, StringArray};
use llkv_runtime::{RuntimeContext, RuntimeStatementResult};
use llkv_sql::SqlEngine;
use llkv_storage::pager::{BoxedPager, simd_r_drive_pager::SimdRDrivePager};
use std::sync::Arc;
use tempfile::NamedTempFile;

/// Helper to create a new SqlEngine with a fresh file-backed pager.
fn create_engine_with_file(path: &std::path::Path) -> SqlEngine {
    let disk_pager = Arc::new(SimdRDrivePager::open(path).expect("Failed to open SimdRDrivePager"));
    let pager = Arc::new(BoxedPager::from_arc(disk_pager));
    let context = Arc::new(RuntimeContext::new(pager));
    SqlEngine::with_context(context, false)
}

/// Helper to execute a SELECT and extract Int64 values from first column.
fn select_int64_values(engine: &SqlEngine, sql: &str) -> Vec<Option<i64>> {
    let mut results = engine.execute(sql).expect("SELECT failed");
    assert_eq!(results.len(), 1, "Expected exactly one result");

    if let RuntimeStatementResult::Select { execution, .. } = results.remove(0) {
        let batches = execution.collect().expect("Failed to collect batches");
        let mut values = Vec::new();

        for batch in batches {
            if batch.num_columns() > 0 {
                let array = batch
                    .column(0)
                    .as_any()
                    .downcast_ref::<Int64Array>()
                    .expect("Expected Int64Array");

                for i in 0..array.len() {
                    if array.is_null(i) {
                        values.push(None);
                    } else {
                        values.push(Some(array.value(i)));
                    }
                }
            }
        }
        values
    } else {
        panic!("Expected SELECT result");
    }
}

/// Helper to execute a SELECT and extract String values from first column.
fn select_string_values(engine: &SqlEngine, sql: &str) -> Vec<Option<String>> {
    let mut results = engine.execute(sql).expect("SELECT failed");
    assert_eq!(results.len(), 1, "Expected exactly one result");

    if let RuntimeStatementResult::Select { execution, .. } = results.remove(0) {
        let batches = execution.collect().expect("Failed to collect batches");
        let mut values = Vec::new();

        for batch in batches {
            if batch.num_columns() > 0 {
                let array = batch
                    .column(0)
                    .as_any()
                    .downcast_ref::<StringArray>()
                    .expect("Expected StringArray");

                for i in 0..array.len() {
                    if array.is_null(i) {
                        values.push(None);
                    } else {
                        values.push(Some(array.value(i).to_string()));
                    }
                }
            }
        }
        values
    } else {
        panic!("Expected SELECT result");
    }
}

/// Helper to count rows from a SELECT COUNT(*) query.
fn count_rows(engine: &SqlEngine, table: &str) -> i64 {
    let sql = format!("SELECT COUNT(*) FROM {}", table);
    let values = select_int64_values(engine, &sql);
    assert_eq!(values.len(), 1, "Expected exactly one count result");
    values[0].expect("COUNT should not be NULL")
}

#[test]
fn test_sql_committed_transaction_persists_across_reopen() {
    // Test that committed SQL transactions persist and remain visible after database reopen.

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path();

    // Scope 1: Create table, insert data, commit
    {
        let engine = create_engine_with_file(db_path);

        engine
            .execute("CREATE TABLE users(id INTEGER, name TEXT)")
            .expect("CREATE TABLE failed");

        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");

        engine
            .execute("INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie')")
            .expect("INSERT failed");

        // Verify data is visible within transaction
        let count = count_rows(&engine, "users");
        assert_eq!(count, 3, "Expected 3 rows before commit");

        engine.execute("COMMIT").expect("COMMIT failed");

        // Verify data still visible after commit
        let count = count_rows(&engine, "users");
        assert_eq!(count, 3, "Expected 3 rows after commit");
    }

    // Scope 2: Reopen database, verify committed data is visible
    {
        let engine = create_engine_with_file(db_path);

        // Table should exist
        engine
            .execute("SELECT * FROM users")
            .expect("Table should exist after reopen");

        // Data should be visible
        let count = count_rows(&engine, "users");
        assert_eq!(count, 3, "Expected 3 rows after reopen");

        let names = select_string_values(&engine, "SELECT name FROM users ORDER BY id");
        assert_eq!(
            names,
            vec![
                Some("Alice".to_string()),
                Some("Bob".to_string()),
                Some("Charlie".to_string())
            ]
        );
    }
}

#[test]
fn test_sql_uncommitted_transaction_not_visible_after_reopen() {
    // Test that uncommitted SQL transactions are lost and invisible after database reopen.

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path();

    // Scope 1: Create table (committed), then insert data without commit
    {
        let engine = create_engine_with_file(db_path);

        // First commit a table creation
        engine
            .execute("CREATE TABLE products(id INTEGER, name TEXT)")
            .expect("CREATE TABLE failed");

        let count = count_rows(&engine, "products");
        assert_eq!(count, 0, "Table should be empty initially");

        // Start a transaction and insert data, but don't commit
        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");

        engine
            .execute("INSERT INTO products VALUES (1, 'Widget'), (2, 'Gadget')")
            .expect("INSERT failed");

        // Verify data is visible within transaction
        let count = count_rows(&engine, "products");
        assert_eq!(count, 2, "Expected 2 rows in uncommitted transaction");

        // Drop engine without commit (transaction is lost)
    }

    // Scope 2: Reopen database, verify uncommitted data is NOT visible
    {
        let engine = create_engine_with_file(db_path);

        // Table should exist (it was committed)
        engine
            .execute("SELECT * FROM products")
            .expect("Table should exist after reopen");

        // But data should NOT be visible (it was not committed)
        let count = count_rows(&engine, "products");
        assert_eq!(
            count, 0,
            "Expected 0 rows after reopen (uncommitted data lost)"
        );
    }
}

#[test]
fn test_sql_multiple_transactions_persist_correctly() {
    // Test that multiple committed SQL transactions persist correctly.

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path();

    // Scope 1: Create table and commit first batch
    {
        let engine = create_engine_with_file(db_path);

        engine
            .execute("CREATE TABLE orders(id INTEGER, amount INTEGER)")
            .expect("CREATE TABLE failed");

        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");

        engine
            .execute("INSERT INTO orders VALUES (1, 100), (2, 200)")
            .expect("INSERT failed");

        engine.execute("COMMIT").expect("COMMIT failed");

        let count = count_rows(&engine, "orders");
        assert_eq!(count, 2, "Expected 2 rows after first commit");
    }

    // Scope 2: Reopen, add more data, commit
    {
        let engine = create_engine_with_file(db_path);

        let count = count_rows(&engine, "orders");
        assert_eq!(count, 2, "Expected 2 rows after reopen");

        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");

        engine
            .execute("INSERT INTO orders VALUES (3, 300), (4, 400)")
            .expect("INSERT failed");

        engine.execute("COMMIT").expect("COMMIT failed");

        let count = count_rows(&engine, "orders");
        assert_eq!(count, 4, "Expected 4 rows after second commit");
    }

    // Scope 3: Reopen again, verify all data is visible
    {
        let engine = create_engine_with_file(db_path);

        let count = count_rows(&engine, "orders");
        assert_eq!(count, 4, "Expected 4 rows after final reopen");

        let ids = select_int64_values(&engine, "SELECT id FROM orders ORDER BY id");
        assert_eq!(ids, vec![Some(1), Some(2), Some(3), Some(4)]);
    }
}

#[test]
fn test_sql_rollback_not_visible_after_reopen() {
    // Test that rolled back SQL transactions are not visible after reopen.

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path();

    // Scope 1: Create table, insert and commit some data
    {
        let engine = create_engine_with_file(db_path);

        engine
            .execute("CREATE TABLE inventory(id INTEGER, quantity INTEGER)")
            .expect("CREATE TABLE failed");

        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");

        engine
            .execute("INSERT INTO inventory VALUES (1, 10), (2, 20)")
            .expect("INSERT failed");

        engine.execute("COMMIT").expect("COMMIT failed");

        let count = count_rows(&engine, "inventory");
        assert_eq!(count, 2, "Expected 2 rows after commit");

        // Start another transaction and insert more data, then rollback
        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");

        engine
            .execute("INSERT INTO inventory VALUES (3, 30), (4, 40)")
            .expect("INSERT failed");

        let count = count_rows(&engine, "inventory");
        assert_eq!(count, 4, "Expected 4 rows in transaction before rollback");

        engine.execute("ROLLBACK").expect("ROLLBACK failed");

        let count = count_rows(&engine, "inventory");
        assert_eq!(count, 2, "Expected 2 rows after rollback");
    }

    // Scope 2: Reopen and verify only committed data is visible
    {
        let engine = create_engine_with_file(db_path);

        let count = count_rows(&engine, "inventory");
        assert_eq!(
            count, 2,
            "Expected 2 rows after reopen (rolled back data not visible)"
        );

        let ids = select_int64_values(&engine, "SELECT id FROM inventory ORDER BY id");
        assert_eq!(ids, vec![Some(1), Some(2)]);
    }
}

#[test]
fn test_sql_delete_persists_across_reopen() {
    // Test that DELETE statements (soft deletes using MVCC) persist correctly.

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path();

    // Scope 1: Create table, insert data, then delete some rows
    {
        let engine = create_engine_with_file(db_path);

        engine
            .execute("CREATE TABLE employees(id INTEGER, name TEXT)")
            .expect("CREATE TABLE failed");

        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");

        engine.execute("INSERT INTO employees VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie'), (4, 'Dave')")
            .expect("INSERT failed");

        engine.execute("COMMIT").expect("COMMIT failed");

        let count = count_rows(&engine, "employees");
        assert_eq!(count, 4, "Expected 4 rows after insert");

        // Delete some rows
        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");

        engine
            .execute("DELETE FROM employees WHERE id = 2")
            .expect("DELETE failed");
        engine
            .execute("DELETE FROM employees WHERE id = 4")
            .expect("DELETE failed");

        let count = count_rows(&engine, "employees");
        assert_eq!(count, 2, "Expected 2 rows after delete in transaction");

        engine.execute("COMMIT").expect("COMMIT failed");

        let count = count_rows(&engine, "employees");
        assert_eq!(count, 2, "Expected 2 rows after delete commit");
    }

    // Scope 2: Reopen and verify deletes persisted
    {
        let engine = create_engine_with_file(db_path);

        let count = count_rows(&engine, "employees");
        assert_eq!(
            count, 2,
            "Expected 2 rows after reopen (deletes should persist)"
        );

        let names = select_string_values(&engine, "SELECT name FROM employees ORDER BY id");
        assert_eq!(
            names,
            vec![Some("Alice".to_string()), Some("Charlie".to_string())]
        );
    }
}

#[test]
fn test_sql_mixed_operations_persist() {
    // Test a realistic scenario with multiple operations across reopens.

    let temp_file = NamedTempFile::new().expect("Failed to create temp file");
    let db_path = temp_file.path();

    // Scope 1: Initial setup
    {
        let engine = create_engine_with_file(db_path);

        engine
            .execute("CREATE TABLE accounts(id INTEGER, balance INTEGER)")
            .expect("CREATE TABLE failed");

        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");
        engine
            .execute("INSERT INTO accounts VALUES (1, 1000), (2, 2000), (3, 3000)")
            .expect("INSERT failed");
        engine.execute("COMMIT").expect("COMMIT failed");
    }

    // Scope 2: Update some records
    {
        let engine = create_engine_with_file(db_path);

        let count = count_rows(&engine, "accounts");
        assert_eq!(count, 3, "Expected 3 accounts");

        engine.execute("BEGIN TRANSACTION").expect("BEGIN failed");
        engine
            .execute("UPDATE accounts SET balance = 1500 WHERE id = 1")
            .expect("UPDATE failed");
        engine
            .execute("DELETE FROM accounts WHERE id = 3")
            .expect("DELETE failed");
        engine.execute("COMMIT").expect("COMMIT failed");

        let count = count_rows(&engine, "accounts");
        assert_eq!(count, 2, "Expected 2 accounts after update/delete");
    }

    // Scope 3: Verify final state
    {
        let engine = create_engine_with_file(db_path);

        let count = count_rows(&engine, "accounts");
        assert_eq!(count, 2, "Expected 2 accounts after reopen");

        let balances = select_int64_values(&engine, "SELECT balance FROM accounts ORDER BY id");
        assert_eq!(balances, vec![Some(1500), Some(2000)]);
    }
}