sqlitegraph 2.0.7

Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
//! Write-Ahead Logging (WAL) for V2 clustered edge format.
//!
//! This module implements high-performance WAL specifically designed for SQLiteGraph's
//! V2-native clustered edge file format, providing transactional durability, crash recovery,
//! and 5-10x write throughput improvement through sequential I/O patterns.
//!
//! # Architecture
//!
//! The WAL system is designed around V2's clustered edge architecture:
//! - **Cluster-Affinity Logging**: Groups operations by cluster to maintain I/O locality
//! - **Sequential Write Patterns**: Leverages V2's natural clustering for optimal throughput
//! - **Incremental Checkpointing**: Progressive dirty block flushing for sustained performance
//!
//! # Performance Targets
//!
//! - **Write Throughput**: 5-10x improvement over current V2 format
//! - **Commit Latency**: <1ms for small transactions
//! - **Recovery Time**: <1 second per 100MB WAL
//! - **Space Overhead**: <15% additional storage
//! - **Read Overhead**: <5% performance impact

pub mod bulk_ingest;
#[cfg(test)]
pub mod bulk_ingest_tests;
pub mod checkpoint;
pub mod graph_integration;
pub mod manager;
pub mod metrics;
pub mod performance;
pub mod reader;
pub mod record;
pub mod recovery;
pub mod transaction_coordinator;
pub mod tx_range_index;
pub mod v2_integration;
pub mod writer;

// Re-export core WAL types
pub use bulk_ingest::{BulkIngestConfig, BulkIngestExt, BulkIngestGuard, BulkIngestMetrics};
pub use checkpoint::V2WALCheckpointManager;
pub use graph_integration::{
    GraphOperationResult, GraphWALIntegrationConfig, NodeRecordV2WALExt, OperationMetrics,
    V2GraphWALIntegrator,
};
pub use manager::{V2WALManager, WALManagerMetrics};
pub use metrics::{V2WALMetrics, WALPerformanceCounters};
pub use performance::{
    AdaptivePerformanceTuner, ClusterAffinityOptimizer, ClusterAffinityStats, CompressionAlgorithm,
    CompressionStats, IOBatcher, IOBatcherStats, PerformanceConfig, WALRecordCompressor,
};
pub use reader::V2WALReader;
pub use record::{ContiguousRegion, V2WALRecord, V2WALRecordType, WALSerializationError};
pub use recovery::V2WALRecoveryEngine;
pub use transaction_coordinator::{
    IsolationLevel, LockType, ResourceId, TransactionContext, TransactionId, TransactionState,
    V2TransactionCoordinator,
};
pub use tx_range_index::{TxRange, TxRangeIndex, iter_visible_wal_records};
pub use v2_integration::{ChangeTracker, V2IntegrationConfig, V2WALIntegrator};
pub use writer::V2WALWriter;

use crate::backend::native::v2::storage::JsonLimits;
use crate::backend::native::{NativeBackendError, NativeResult};
use std::path::PathBuf;

/// V2 WAL configuration parameters
#[derive(Debug, Clone)]
pub struct V2WALConfig {
    /// Path to the graph file (can be .db, .v2, or any extension)
    pub graph_path: PathBuf,

    /// Path to the main WAL file
    pub wal_path: PathBuf,

    /// Path to the checkpoint tracking file
    pub checkpoint_path: PathBuf,

    /// Maximum WAL file size before forced checkpoint (default: 1GB)
    pub max_wal_size: u64,

    /// Write buffer size for optimal I/O alignment (default: 1MB)
    pub buffer_size: usize,

    /// Checkpoint interval in number of transactions (default: 1000)
    pub checkpoint_interval: u64,

    /// Group commit timeout in milliseconds (default: 10ms)
    pub group_commit_timeout_ms: u64,

    /// Maximum number of records in group commit batch (default: 100)
    pub max_group_commit_size: usize,

    /// Enable compression for WAL records (default: false)
    pub enable_compression: bool,

    /// Compression level if enabled (1-9, default: 3)
    pub compression_level: u8,

    /// Enable automatic checkpointing after commits (default: true)
    pub auto_checkpoint: bool,

    /// Spawn background thread for periodic checkpoint checks (default: false)
    pub background_checkpoint_thread: bool,

    /// Interval for background checkpoint checks in seconds (default: 60)
    pub background_checkpoint_interval_secs: u64,

    /// JSON input validation limits (default: 10MB size, 128 depth)
    pub json_limits: JsonLimits,
}

impl Default for V2WALConfig {
    fn default() -> Self {
        Self {
            graph_path: PathBuf::from("v2_graph.db"),
            wal_path: PathBuf::from("v2_graph.wal"),
            checkpoint_path: PathBuf::from("v2_graph.checkpoint"),
            max_wal_size: 1024 * 1024 * 1024, // 1GB
            buffer_size: 1024 * 1024,         // 1MB
            checkpoint_interval: 1000,
            group_commit_timeout_ms: 10,
            max_group_commit_size: 100,
            enable_compression: false,
            compression_level: 3,
            auto_checkpoint: true,
            background_checkpoint_thread: false, // Opt-in for now
            background_checkpoint_interval_secs: 60,
            json_limits: JsonLimits::default(),
        }
    }
}

impl V2WALConfig {
    /// Create WAL config for the given graph file path
    pub fn for_graph_file(graph_path: &std::path::Path) -> Self {
        let base_path = graph_path.with_extension("");
        Self {
            graph_path: graph_path.to_path_buf(),
            wal_path: base_path.with_extension("wal"),
            checkpoint_path: base_path.with_extension("checkpoint"),
            ..Default::default()
        }
    }

    /// Validate configuration parameters
    pub fn validate(&self) -> NativeResult<()> {
        if self.buffer_size == 0 || !self.buffer_size.is_power_of_two() {
            return Err(NativeBackendError::InvalidConfiguration {
                parameter: "buffer_size".to_string(),
                reason: "must be a non-zero power of two".to_string(),
            });
        }

        if self.max_wal_size < 1024 * 1024 {
            return Err(NativeBackendError::InvalidConfiguration {
                parameter: "max_wal_size".to_string(),
                reason: "must be at least 1MB".to_string(),
            });
        }

        if self.checkpoint_interval == 0 {
            return Err(NativeBackendError::InvalidConfiguration {
                parameter: "checkpoint_interval".to_string(),
                reason: "must be greater than zero".to_string(),
            });
        }

        if self.enable_compression && !(1..=9).contains(&self.compression_level) {
            return Err(NativeBackendError::InvalidConfiguration {
                parameter: "compression_level".to_string(),
                reason: "must be between 1 and 9 when compression is enabled".to_string(),
            });
        }

        Ok(())
    }

    /// Set JSON validation limits
    pub fn with_json_limits(mut self, limits: JsonLimits) -> Self {
        self.json_limits = limits;
        self
    }

    /// Set maximum JSON payload size (uses default depth)
    pub fn with_max_json_size(mut self, max_size: usize) -> Self {
        self.json_limits.max_size = max_size;
        self
    }

    /// Set maximum JSON nesting depth (uses default size)
    pub fn with_max_json_depth(mut self, max_depth: usize) -> Self {
        self.json_limits.max_depth = max_depth;
        self
    }

    /// Get JSON validation limits
    pub fn json_limits(&self) -> &JsonLimits {
        &self.json_limits
    }
}

/// WAL file header for format identification and metadata
#[derive(Debug, Clone, Copy)]
#[repr(C)]
pub struct V2WALHeader {
    /// Magic bytes for V2 WAL format identification
    pub magic: [u8; 8],

    /// WAL format version
    pub version: u32,

    /// WAL file creation timestamp (Unix epoch)
    pub created_at: u64,

    /// Current Log Sequence Number (LSN)
    pub current_lsn: u64,

    /// Committed LSN (all records up to this LSN are durable)
    pub committed_lsn: u64,

    /// Checkpointed LSN (all records up to this LSN are in main file)
    pub checkpointed_lsn: u64,

    /// Number of active transactions
    pub active_transactions: u32,

    /// WAL file flags for feature toggles
    pub flags: u32,

    /// Reserved for future use
    pub reserved: [u64; 4],
}

impl V2WALHeader {
    /// Magic bytes for V2 WAL format
    pub const MAGIC: [u8; 8] = [b'V', b'2', b'W', b'A', b'L', 0, 0, 0];

    /// Current WAL format version
    pub const VERSION: u32 = 1;

    /// Flag: compression enabled
    pub const FLAG_COMPRESSION: u32 = 0x00000001;

    /// Flag: cluster affinity logging enabled
    pub const FLAG_CLUSTER_AFFINITY: u32 = 0x00000002;

    /// Flag: group commit enabled
    pub const FLAG_GROUP_COMMIT: u32 = 0x00000004;

    /// Initialize a new WAL header
    pub fn new() -> Self {
        let now = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .map(|d| d.as_secs())
            .unwrap_or(0);

        Self {
            magic: Self::MAGIC,
            version: Self::VERSION,
            created_at: now,
            current_lsn: 1,
            committed_lsn: 0,
            checkpointed_lsn: 0,
            active_transactions: 0,
            flags: Self::FLAG_CLUSTER_AFFINITY | Self::FLAG_GROUP_COMMIT,
            reserved: [0; 4],
        }
    }

    /// Validate WAL header integrity
    pub fn validate(&self) -> NativeResult<()> {
        if self.magic != Self::MAGIC {
            return Err(NativeBackendError::CorruptionDetected {
                context: format!("WAL header: invalid magic bytes: {:?}", self.magic),
                source: None,
            });
        }

        if self.version != Self::VERSION {
            return Err(NativeBackendError::VersionMismatch {
                expected: Self::VERSION.to_string(),
                found: self.version.to_string(),
                source: None,
            });
        }

        if self.current_lsn == 0 {
            return Err(NativeBackendError::CorruptionDetected {
                context: "WAL header: current_lsn cannot be zero".to_string(),
                source: None,
            });
        }

        if self.committed_lsn > self.current_lsn {
            return Err(NativeBackendError::CorruptionDetected {
                context: "WAL header: committed_lsn cannot be greater than current_lsn".to_string(),
                source: None,
            });
        }

        if self.checkpointed_lsn > self.committed_lsn {
            return Err(NativeBackendError::CorruptionDetected {
                context: "WAL header: checkpointed_lsn cannot be greater than committed_lsn"
                    .to_string(),
                source: None,
            });
        }

        Ok(())
    }

    /// Check if a specific feature flag is enabled
    pub fn has_flag(&self, flag: u32) -> bool {
        (self.flags & flag) != 0
    }

    /// Set or clear a feature flag
    pub fn set_flag(&mut self, flag: u32, enabled: bool) {
        if enabled {
            self.flags |= flag;
        } else {
            self.flags &= !flag;
        }
    }
}

/// Log Sequence Number (LSN) utilities
pub mod lsn {
    /// LSN representing the beginning of the WAL
    pub const LSN_BEGIN: u64 = 1;

    /// LSN representing invalid/uninitialized position
    pub const LSN_INVALID: u64 = 0;

    /// Check if an LSN is valid
    #[inline]
    pub fn is_valid(lsn: u64) -> bool {
        lsn >= LSN_BEGIN
    }

    /// Get the next LSN
    #[inline]
    pub fn next(lsn: u64) -> u64 {
        lsn.wrapping_add(1)
    }

    /// Get the distance between two LSNs
    #[inline]
    pub fn distance(from: u64, to: u64) -> u64 {
        to.saturating_sub(from)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_v2_wal_config_default() {
        let config = V2WALConfig::default();
        assert!(config.validate().is_ok());
    }

    #[test]
    fn test_v2_wal_config_for_graph_file() {
        let graph_path = std::path::Path::new("/tmp/test.graph");
        let config = V2WALConfig::for_graph_file(graph_path);

        assert_eq!(config.wal_path, std::path::Path::new("/tmp/test.wal"));
        assert_eq!(
            config.checkpoint_path,
            std::path::Path::new("/tmp/test.checkpoint")
        );
        assert!(config.validate().is_ok());
    }

    #[test]
    fn test_v2_wal_config_validation() {
        let mut config = V2WALConfig::default();

        // Invalid buffer size
        config.buffer_size = 1023; // Not power of two
        assert!(config.validate().is_err());

        config.buffer_size = 0;
        assert!(config.validate().is_err());

        // Valid
        config.buffer_size = 4096;
        assert!(config.validate().is_ok());
    }

    #[test]
    fn test_v2_wal_header() {
        let header = V2WALHeader::new();
        assert!(header.validate().is_ok());
        assert_eq!(header.magic, V2WALHeader::MAGIC);
        assert_eq!(header.version, V2WALHeader::VERSION);
        assert!(header.has_flag(V2WALHeader::FLAG_CLUSTER_AFFINITY));
        assert!(header.has_flag(V2WALHeader::FLAG_GROUP_COMMIT));
        assert!(!header.has_flag(V2WALHeader::FLAG_COMPRESSION));
    }

    #[test]
    fn test_lsn_utilities() {
        assert!(!lsn::is_valid(lsn::LSN_INVALID));
        assert!(lsn::is_valid(lsn::LSN_BEGIN));

        assert_eq!(lsn::next(lsn::LSN_BEGIN), lsn::LSN_BEGIN + 1);
        assert_eq!(lsn::distance(10, 15), 5);
        assert_eq!(lsn::distance(20, 15), 0); // Saturates at 0
    }
}