sqlitegraph 2.0.7

Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
//! In-memory HashMap-based KV store implementation

use crate::backend::native::v2::kv_store::ttl;
use crate::backend::native::v2::kv_store::types::{KvEntry, KvMetadata, KvStoreError, KvValue};
use crate::snapshot::SnapshotId;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::time::SystemTime;

/// In-memory key-value store with multi-version storage
///
/// Each key retains a version history (Vec<KvEntry> ordered by LSN ascending).
/// This enables true MVCC snapshot isolation where older snapshots see older versions.
#[derive(Debug, Default)]
pub struct KvStore {
    /// Visible to kv_store modules for WAL recovery and TTL cleanup
    /// Each key maps to a version history Vec<KvEntry>, sorted by version (ascending LSN)
    pub(crate) entries: RwLock<HashMap<Vec<u8>, Vec<KvEntry>>>,
}

impl KvStore {
    /// Create a new empty KV store
    pub fn new() -> Self {
        Self::default()
    }

    /// Get a value by key
    ///
    /// For tests only - production code should use get_at_snapshot()
    /// Returns most recent committed value (latest version in history)
    /// TTL is checked lazily: expired entries return None
    pub fn get(&self, key: &[u8]) -> Result<Option<KvValue>, KvStoreError> {
        let entries = self.entries.read();
        if let Some(versions) = entries.get(key) {
            // Get latest version (last element in Vec)
            if let Some(entry) = versions.last() {
                // Check TTL before returning value
                if ttl::is_expired(entry) {
                    return Ok(None);
                }
                Ok(Some(entry.value.clone()))
            } else {
                Ok(None)
            }
        } else {
            Ok(None)
        }
    }

    /// Get a value at a specific snapshot
    ///
    /// This enforces true MVCC snapshot isolation: finds the latest version
    /// committed at or before the given snapshot_id.
    ///
    /// Uses binary search (O(log n)) to find the correct version in the history.
    ///
    /// TTL is checked lazily: expired entries are filtered on read.
    ///
    /// # Arguments
    /// * `key` - Key to retrieve
    /// * `snapshot_id` - Only return data committed at or before this snapshot
    ///
    /// # Returns
    /// The value if found and visible at snapshot, or None if not found or not visible
    pub fn get_at_snapshot(
        &self,
        key: &[u8],
        snapshot_id: SnapshotId,
    ) -> Result<Option<KvValue>, KvStoreError> {
        let entries = self.entries.read();
        let snapshot_lsn = snapshot_id.as_lsn();

        if let Some(versions) = entries.get(key) {
            // Snapshot at 0 means "see all data" - return latest version
            if snapshot_lsn == 0 {
                if let Some(entry) = versions.last() {
                    if ttl::is_expired(entry) {
                        return Ok(None);
                    }
                    return Ok(Some(entry.value.clone()));
                }
                return Ok(None);
            }

            // Binary search for the latest version with version <= snapshot_lsn
            // partition_point returns index of first element where predicate is false
            // We want: entry.version <= snapshot_lsn
            let idx = versions.partition_point(|e| e.metadata.version <= snapshot_lsn);

            if idx == 0 {
                // All versions are newer than snapshot (all version > snapshot_lsn)
                return Ok(None);
            }

            // versions[idx - 1] is the latest version with version <= snapshot_lsn
            let entry = &versions[idx - 1];

            // Check if entry is expired (lazy TTL cleanup)
            if ttl::is_expired(entry) {
                return Ok(None);
            }

            // Entry is visible and not expired
            Ok(Some(entry.value.clone()))
        } else {
            // Key not found
            Ok(None)
        }
    }

    /// Set a value with optional TTL
    ///
    /// Appends a new version to the key's version history.
    /// The version number is set to 0 and will be updated by the WAL system.
    pub fn set(
        &mut self,
        key: Vec<u8>,
        value: KvValue,
        ttl: Option<u64>,
    ) -> Result<(), KvStoreError> {
        let now = SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);

        let mut entries = self.entries.write();

        // Get created_at from existing versions (if any)
        let created_at = if let Some(versions) = entries.get(&key) {
            if let Some(latest) = versions.last() {
                latest.metadata.created_at
            } else {
                now
            }
        } else {
            now
        };

        let metadata = KvMetadata {
            created_at,
            updated_at: now,
            ttl_seconds: ttl,
            version: 0, // Will be set by WAL in plan 02
        };

        let entry = KvEntry {
            key: key.clone(),
            value,
            metadata,
        };

        // Append new version to history (maintains sorted order since LSNs are monotonic)
        entries.entry(key).or_default().push(entry);

        Ok(())
    }

    /// Delete a key
    ///
    /// Removes the entire version history for the key.
    pub fn delete(&mut self, key: &[u8]) -> Result<(), KvStoreError> {
        let mut entries = self.entries.write();
        entries
            .remove(key)
            .map(|_| ())
            .ok_or_else(|| KvStoreError::KeyNotFound(key.to_vec()))
    }

    /// Check if a key exists
    ///
    /// Note: This checks TTL lazily - expired keys return false even if present in storage.
    /// Only the latest version is checked.
    pub fn exists(&self, key: &[u8]) -> bool {
        let entries = self.entries.read();
        if let Some(versions) = entries.get(key) {
            // Check latest version
            if let Some(entry) = versions.last() {
                // Key exists, but check if expired
                !ttl::is_expired(entry)
            } else {
                false
            }
        } else {
            false
        }
    }

    /// Get the number of entries
    pub fn len(&self) -> usize {
        let entries = self.entries.read();
        entries.len()
    }

    /// Scan all entries with a given prefix at a snapshot
    ///
    /// Returns all keys that start with the given prefix, along with their values.
    /// Results are sorted in lexicographic order by key.
    ///
    /// # Arguments
    /// * `snapshot_id` - Only return data committed at or before this snapshot
    /// * `prefix` - Prefix to match (empty prefix returns all keys)
    ///
    /// # Returns
    /// Vector of (key, value) pairs for all matching keys
    pub fn prefix_scan(
        &self,
        snapshot_id: SnapshotId,
        prefix: &[u8],
    ) -> Result<Vec<(Vec<u8>, KvValue)>, KvStoreError> {
        let entries = self.entries.read();
        let snapshot_lsn = snapshot_id.as_lsn();

        let mut results = Vec::new();
        for (key, versions) in entries.iter() {
            if !key.starts_with(prefix) {
                continue;
            }

            // Find version visible at snapshot
            let entry = if snapshot_lsn == 0 {
                // Snapshot at 0 means "see all data" - get latest version
                versions.last()
            } else {
                // Binary search for version <= snapshot_lsn
                let idx = versions.partition_point(|e| e.metadata.version <= snapshot_lsn);
                if idx == 0 {
                    None
                } else {
                    Some(&versions[idx - 1])
                }
            };

            if let Some(e) = entry {
                if !ttl::is_expired(e) {
                    results.push((key.clone(), e.value.clone()));
                }
            }
        }
        results.sort_by(|a, b| a.0.cmp(&b.0)); // Lexicographic order
        Ok(results)
    }

    /// Explicit cleanup of all expired entries
    ///
    /// This is a manual cleanup operation - NOT called automatically.
    /// Lazy cleanup on read is sufficient for correctness.
    /// This method is only for space reclamation optimization.
    ///
    /// # Returns
    /// The number of entries removed
    pub fn cleanup_expired(&mut self) -> usize {
        ttl::cleanup_expired_entries(self)
    }

    /// Internal method for WAL replay - set with explicit version
    ///
    /// This is used during WAL recovery to restore entries with their original versions.
    /// Normal set() operations should use version 0 (the WAL system assigns the real version).
    ///
    /// Maintains version history in sorted order by LSN.
    pub fn set_with_version(
        &mut self,
        key: Vec<u8>,
        value: KvValue,
        ttl: Option<u64>,
        version: u64,
    ) -> Result<(), KvStoreError> {
        let now = SystemTime::now()
            .duration_since(SystemTime::UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);

        let mut entries = self.entries.write();

        // Get created_at from existing versions (if any)
        let created_at = if let Some(versions) = entries.get(&key) {
            if let Some(latest) = versions.last() {
                latest.metadata.created_at
            } else {
                now
            }
        } else {
            now
        };

        let metadata = KvMetadata {
            created_at,
            updated_at: now,
            ttl_seconds: ttl,
            version,
        };

        let entry = KvEntry {
            key: key.clone(),
            value,
            metadata,
        };

        // Insert into version history, maintaining sorted order by version
        let versions = entries.entry(key).or_default();

        // Find insertion point to maintain sorted order
        let pos = versions.partition_point(|e| e.metadata.version < version);

        // Insert at correct position
        versions.insert(pos, entry);

        Ok(())
    }
}

/// Recover KV store data from WAL file
///
/// This function reads the WAL file and applies all KvSet and KvDelete records
/// to rebuild the KV store state. This is called during NativeGraphBackend::open()
/// to populate the in-memory KvStore with persisted data from WAL.
///
/// # Arguments
/// * `wal_path` - Path to the WAL file
///
/// # Returns
/// A KvStore populated with recovered data, or an empty store if WAL doesn't exist
///
/// # Errors
/// Returns KvStoreError if WAL recovery fails
pub fn recover_from_wal<P: AsRef<std::path::Path>>(wal_path: P) -> Result<KvStore, KvStoreError> {
    let wal_path = wal_path.as_ref();

    // If WAL doesn't exist, return empty store (no recovery needed)
    if !wal_path.exists() {
        return Ok(KvStore::new());
    }

    // Open WAL reader
    let mut reader = crate::backend::native::v2::wal::V2WALReader::open(wal_path)
        .map_err(|e| KvStoreError::RecoveryFailed(format!("Failed to open WAL: {}", e)))?;

    // Read all records directly without validation (KV records may not be in transactions)
    let mut store = KvStore::new();

    loop {
        // Read record without contiguity validation
        let result = reader.read_next_record_opt(false);

        match result {
            Ok(Some((lsn, record))) => {
                match record {
                    crate::backend::native::v2::wal::V2WALRecord::KvSet {
                        key,
                        value_bytes,
                        value_type,
                        ttl_seconds,
                        version,
                    } => {
                        // Use version from WAL if provided, otherwise use LSN
                        let record_version = if version == 0 { lsn } else { version };

                        // Deserialize and apply value
                        crate::backend::native::v2::kv_store::wal::apply_set(
                            &mut store,
                            key,
                            value_bytes,
                            value_type,
                            ttl_seconds,
                            record_version,
                        )?;
                    }
                    crate::backend::native::v2::wal::V2WALRecord::KvDelete { key, .. } => {
                        // Delete from store (ignore if not found)
                        let _ = crate::backend::native::v2::kv_store::wal::apply_delete(
                            &mut store,
                            key,
                            0, // old_version not needed for recovery
                        );
                    }
                    _ => {
                        // Skip non-KV records
                    }
                }
            }
            Ok(None) => {
                // End of WAL
                break;
            }
            Err(_) => {
                // Stop reading on error
                break;
            }
        }
    }

    Ok(store)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_new_store() {
        let store = KvStore::new();
        assert_eq!(store.len(), 0);
    }

    #[test]
    fn test_get_at_snapshot_visible() {
        // Entry should be visible if version <= snapshot_id
        let mut store = KvStore::new();

        // Create entry with version 100
        store
            .set_with_version(b"key".to_vec(), KvValue::Integer(42), None, 100)
            .unwrap();

        // Snapshot at version 150 should see the entry
        let snapshot = SnapshotId::from_lsn(150);
        let result = store.get_at_snapshot(b"key", snapshot).unwrap();
        assert_eq!(result, Some(KvValue::Integer(42)));
    }

    #[test]
    fn test_get_at_snapshot_not_visible() {
        // Entry should NOT be visible if version > snapshot_id
        let mut store = KvStore::new();

        // Create entry with version 200
        store
            .set_with_version(b"key".to_vec(), KvValue::Integer(42), None, 200)
            .unwrap();

        // Snapshot at version 150 should NOT see the entry (version 200 > 150)
        let snapshot = SnapshotId::from_lsn(150);
        let result = store.get_at_snapshot(b"key", snapshot).unwrap();
        assert_eq!(result, None);
    }

    #[test]
    fn test_get_at_snapshot_expired() {
        // Expired entry should not be visible even if version matches
        let mut store = KvStore::new();

        // Create entry with short TTL (1 second)
        store
            .set_with_version(
                b"key".to_vec(),
                KvValue::Integer(42),
                Some(1), // 1 second TTL
                100,
            )
            .unwrap();

        // Sleep to ensure expiration
        std::thread::sleep(std::time::Duration::from_secs(2));

        // Snapshot at version 150 should NOT see the entry (expired)
        let snapshot = SnapshotId::from_lsn(150);
        let result = store.get_at_snapshot(b"key", snapshot).unwrap();
        assert_eq!(result, None);
    }

    #[test]
    fn test_get_at_snapshot_missing_key() {
        // Missing key should return None
        let store = KvStore::new();

        let snapshot = SnapshotId::from_lsn(100);
        let result = store.get_at_snapshot(b"missing", snapshot).unwrap();
        assert_eq!(result, None);
    }

    #[test]
    fn test_snapshot_isolation_multiple_versions() {
        // Test true MVCC: multiple versions retained, snapshots see correct version
        let mut store = KvStore::new();

        // Create key with version 100
        store
            .set_with_version(b"key".to_vec(), KvValue::Integer(100), None, 100)
            .unwrap();

        // Update same key with version 200 (MVCC: retains version 100)
        store
            .set_with_version(b"key".to_vec(), KvValue::Integer(200), None, 200)
            .unwrap();

        // Snapshot at 250 should see version 200 (latest version <= 250)
        let snapshot_250 = SnapshotId::from_lsn(250);
        let result = store.get_at_snapshot(b"key", snapshot_250).unwrap();
        assert_eq!(result, Some(KvValue::Integer(200)));

        // Snapshot at 150 should see version 100 (version history retained!)
        let snapshot_150 = SnapshotId::from_lsn(150);
        let result = store.get_at_snapshot(b"key", snapshot_150).unwrap();
        assert_eq!(result, Some(KvValue::Integer(100))); // TRUE MVCC!

        // Snapshot at 50 should see nothing (all versions > 50)
        let snapshot_50 = SnapshotId::from_lsn(50);
        let result = store.get_at_snapshot(b"key", snapshot_50).unwrap();
        assert_eq!(result, None);
    }
}