kglite 0.10.23

Pure-Rust knowledge graph engine — Cypher pipeline, snapshot/working CoW transactions, columnar/mmap/disk storage backends, optional dataset loaders (SEC EDGAR, Sodir, Wikidata). PyO3 wrappers live in the sibling kglite-py crate (the Python wheel); embeddable directly from any Rust binary without PyO3 in the dep tree.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
//! Write-ahead log for durable in-memory graphs (Stage 1 of the
//! embedded-Cypher-DB durability work).
//!
//! ## What this is
//!
//! A `.kgl-wal` sidecar holds an append-only sequence of **logical**
//! mutation frames. Each committed mutation operation appends one
//! [`WalFrame`] — a batch of [`MutationOp`]s tagged with the post-commit
//! graph `version` as its log-sequence number (LSN) — and `fsync`s. On
//! open, the engine loads the `.kgl` checkpoint snapshot, then replays
//! every WAL frame with `lsn > checkpoint.version` to recover work
//! committed since the last checkpoint. A checkpoint (a full `.kgl`
//! save) truncates the WAL.
//!
//! This module owns only the **on-disk format**: the op schema, the
//! frame envelope, and crash-safe read/write. Capture (translating
//! `GraphWrite` calls into ops) and replay (applying ops to a
//! `DirGraph`) live in their own modules — kept separate so the format
//! can be tested in isolation.
//!
//! ## Logical, identity-keyed ops
//!
//! Ops are keyed by **stable logical identity**, never by petgraph
//! `NodeIndex`/`EdgeIndex` (which do not survive checkpoint load or
//! compaction). A node is `(node_type, id)`; an edge is
//! `(conn_type, src, tgt)`. Both are unique in kglite's model, so the
//! two state-changing shapes are an idempotent **upsert** (add-or-replace
//! the full property set) and a **remove**. Idempotence means replaying a
//! frame twice is harmless — important for crash recovery, where the last
//! frame before a crash may or may not have been applied to the snapshot.
//!
//! ## Crash safety of the format
//!
//! A frame is `[len: u32 LE][crc32: u32 LE][payload: bincode(WalFrame)]`.
//! A crash mid-append leaves a torn trailing frame; [`read_frames`] stops
//! at the first short read or CRC mismatch and returns every frame up to
//! it. A torn frame is therefore *discarded*, never half-applied — the
//! atomic unit of durability is the whole frame, committed by the `fsync`
//! that follows its append.

use std::fs::{File, OpenOptions};
use std::io::{self, BufReader, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::OnceLock;

use serde::{Deserialize, Serialize};

use crate::datatypes::Value;

/// File magic for a kglite WAL sidecar: `KWAL`.
pub const WAL_MAGIC: [u8; 4] = *b"KWAL";

/// On-disk WAL format version. Bumped only on a breaking frame-layout
/// change; the WAL is a within-version recovery artefact (truncated at
/// every checkpoint), not a long-term archival format like `.kgl`.
pub const WAL_FORMAT_VERSION: u8 = 1;

/// One logical, identity-keyed mutation. See the module docs for why
/// the state-changing shapes are idempotent upserts.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum MutationOp {
    /// Add-or-replace a node identified by `(node_type, id)` with the
    /// full given title + property set.
    UpsertNode {
        node_type: String,
        id: Value,
        title: Value,
        properties: Vec<(String, Value)>,
    },
    /// Remove the node identified by `(node_type, id)`, if present.
    RemoveNode { node_type: String, id: Value },
    /// Add-or-replace the edge `(conn_type, src, tgt)` with the full
    /// given property set. Endpoints are named by their logical
    /// `(node_type, id)`.
    UpsertEdge {
        conn_type: String,
        src_type: String,
        src_id: Value,
        tgt_type: String,
        tgt_id: Value,
        properties: Vec<(String, Value)>,
    },
    /// Remove the edge `(conn_type, src, tgt)`, if present.
    RemoveEdge {
        conn_type: String,
        src_type: String,
        src_id: Value,
        tgt_type: String,
        tgt_id: Value,
    },
}

/// One committed mutation operation: the ops it produced, tagged with
/// the post-commit graph version as the log-sequence number.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct WalFrame {
    /// Post-commit graph `version`. Frames replay in ascending `lsn`;
    /// on recovery, frames with `lsn <= checkpoint_version` are already
    /// folded into the snapshot and skipped.
    pub lsn: u64,
    /// The logical ops this commit produced, in application order.
    pub ops: Vec<MutationOp>,
}

// ─────────────────────────────────────────────────────────────────────
// CRC32 (IEEE 802.3, polynomial 0xEDB88320) — dependency-free, table-
// backed. Deterministic across processes/builds (unlike DefaultHasher),
// which the torn-frame check relies on.
// ─────────────────────────────────────────────────────────────────────

fn crc32_table() -> &'static [u32; 256] {
    static TABLE: OnceLock<[u32; 256]> = OnceLock::new();
    TABLE.get_or_init(|| {
        let mut table = [0u32; 256];
        let mut n = 0;
        while n < 256 {
            let mut c = n as u32;
            let mut k = 0;
            while k < 8 {
                c = if c & 1 != 0 {
                    0xEDB8_8320 ^ (c >> 1)
                } else {
                    c >> 1
                };
                k += 1;
            }
            table[n] = c;
            n += 1;
        }
        table
    })
}

/// CRC32 (IEEE) of `data`. Used as the per-frame integrity check.
pub fn crc32(data: &[u8]) -> u32 {
    let table = crc32_table();
    let mut crc = 0xFFFF_FFFFu32;
    for &b in data {
        crc = table[((crc ^ b as u32) & 0xFF) as usize] ^ (crc >> 8);
    }
    crc ^ 0xFFFF_FFFF
}

// ─────────────────────────────────────────────────────────────────────
// Write side
// ─────────────────────────────────────────────────────────────────────

/// Write the WAL file header (magic + format version) to a freshly
/// created/truncated WAL. Call once before any [`append_frame`].
pub fn write_header(w: &mut impl Write) -> io::Result<()> {
    w.write_all(&WAL_MAGIC)?;
    w.write_all(&[WAL_FORMAT_VERSION])?;
    Ok(())
}

/// Append one frame: `[len][crc][payload]`. The caller is responsible
/// for `fsync`/`flush` after the append to make it durable — this fn
/// only writes the bytes (so a batch of frames can share one fsync if
/// the caller wants).
pub fn append_frame(w: &mut impl Write, frame: &WalFrame) -> io::Result<()> {
    let payload =
        bincode::serialize(frame).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
    let len = u32::try_from(payload.len())
        .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "WAL frame exceeds 4 GiB"))?;
    let crc = crc32(&payload);
    w.write_all(&len.to_le_bytes())?;
    w.write_all(&crc.to_le_bytes())?;
    w.write_all(&payload)?;
    Ok(())
}

// ─────────────────────────────────────────────────────────────────────
// Read side
// ─────────────────────────────────────────────────────────────────────

/// Read a fixed-size buffer, mapping a clean OR partial EOF to `None`
/// (both end the frame stream). Any other I/O error propagates.
fn read_exact_opt(r: &mut impl Read, buf: &mut [u8]) -> io::Result<Option<()>> {
    match r.read_exact(buf) {
        Ok(()) => Ok(Some(())),
        Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
        Err(e) => Err(e),
    }
}

/// Read and validate the WAL header. Returns the format version, or an
/// error if the magic is wrong. An empty reader (0 bytes) is an error —
/// a WAL file should always carry at least a header.
pub fn read_header(r: &mut impl Read) -> io::Result<u8> {
    let mut magic = [0u8; 4];
    r.read_exact(&mut magic)?;
    if magic != WAL_MAGIC {
        return Err(io::Error::new(
            io::ErrorKind::InvalidData,
            "not a kglite WAL file (bad magic)",
        ));
    }
    let mut ver = [0u8; 1];
    r.read_exact(&mut ver)?;
    Ok(ver[0])
}

/// Read every intact frame from `r`, which must be positioned at the
/// start of the file. Reads and validates the header, then frames until
/// a clean EOF or the first torn/corrupt frame (short read or CRC
/// mismatch) — that frame and anything after it are discarded, modelling
/// a crash mid-append. Returns the recovered frames in file order.
pub fn read_frames(mut r: impl Read) -> io::Result<Vec<WalFrame>> {
    let version = read_header(&mut r)?;
    if version != WAL_FORMAT_VERSION {
        return Err(io::Error::new(
            io::ErrorKind::InvalidData,
            format!(
                "unsupported WAL format version {version} (this build writes \
                 v{WAL_FORMAT_VERSION}); checkpoint with an older build first"
            ),
        ));
    }

    let mut frames = Vec::new();
    loop {
        let mut len_buf = [0u8; 4];
        if read_exact_opt(&mut r, &mut len_buf)?.is_none() {
            break; // clean EOF or torn length prefix
        }
        let mut crc_buf = [0u8; 4];
        if read_exact_opt(&mut r, &mut crc_buf)?.is_none() {
            break; // torn: header present, crc missing
        }
        let len = u32::from_le_bytes(len_buf) as usize;
        let expected_crc = u32::from_le_bytes(crc_buf);

        let mut payload = vec![0u8; len];
        if read_exact_opt(&mut r, &mut payload)?.is_none() {
            break; // torn: payload short
        }
        if crc32(&payload) != expected_crc {
            break; // corrupt/torn payload — stop here
        }
        match bincode::deserialize::<WalFrame>(&payload) {
            Ok(frame) => frames.push(frame),
            Err(_) => break, // unparseable payload — treat as torn tail
        }
    }
    Ok(frames)
}

// ─────────────────────────────────────────────────────────────────────
// File handle — session-scoped append log
// ─────────────────────────────────────────────────────────────────────

/// The sidecar WAL path for a `.kgl` checkpoint file: `<path>-wal`. Keeps
/// the WAL adjacent to its checkpoint so one is never found without the
/// other being locatable.
pub fn wal_path(checkpoint: &Path) -> PathBuf {
    let mut s = checkpoint.as_os_str().to_owned();
    s.push("-wal");
    PathBuf::from(s)
}

/// Read every intact frame from the WAL at `path` for crash recovery.
/// A missing file yields no frames (a graph that was never mutated since
/// its checkpoint). Stops at the first torn/corrupt frame (see
/// [`read_frames`]).
pub fn recover(path: &Path) -> io::Result<Vec<WalFrame>> {
    match File::open(path) {
        Ok(f) => read_frames(BufReader::new(f)),
        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(Vec::new()),
        Err(e) => Err(e),
    }
}

/// An open, append-only WAL file. Session-scoped (one per open graph
/// file) — it owns a `File` handle, so it lives *outside* the CoW-cloned
/// `DirGraph` (which must stay `Clone`). Each [`append`](Self::append)
/// writes a frame and `fsync`s, making the committed mutation durable
/// before the call returns.
#[derive(Debug)]
pub struct Wal {
    file: File,
    path: PathBuf,
}

impl Wal {
    /// Open the WAL at `path` for appending, creating it with a fresh
    /// header if absent. An existing WAL is opened in append mode with its
    /// frames intact — call [`recover`] *before* opening if you need to
    /// replay them.
    pub fn open(path: PathBuf) -> io::Result<Self> {
        let existed = path.exists();
        let mut file = OpenOptions::new()
            .create(true)
            .read(true)
            .append(true)
            .open(&path)?;
        if !existed {
            write_header(&mut file)?;
            file.sync_all()?;
        }
        Ok(Self { file, path })
    }

    /// Append one frame and `fsync` — the durability point. Returns only
    /// after the bytes are on stable storage.
    pub fn append(&mut self, frame: &WalFrame) -> io::Result<()> {
        append_frame(&mut self.file, frame)?;
        self.file.flush()?;
        self.file.sync_data()?;
        Ok(())
    }

    /// Reset to an empty WAL (header only), `fsync`ing the truncation.
    /// Called after a checkpoint (a full `.kgl` save) has folded every
    /// frame into the snapshot, so the log can start fresh.
    pub fn reset(&mut self) -> io::Result<()> {
        self.file.set_len(0)?;
        write_header(&mut self.file)?;
        self.file.sync_all()?;
        Ok(())
    }

    /// The WAL's filesystem path.
    pub fn path(&self) -> &Path {
        &self.path
    }
}

// ─────────────────────────────────────────────────────────────────────
// Tests
// ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::Cursor;
    use tempfile::TempDir;

    fn sample_ops() -> Vec<MutationOp> {
        vec![
            MutationOp::UpsertNode {
                node_type: "Person".to_string(),
                id: Value::Int64(1),
                title: Value::String("Alice".to_string()),
                properties: vec![
                    ("age".to_string(), Value::Int64(30)),
                    ("city".to_string(), Value::String("Oslo".to_string())),
                ],
            },
            MutationOp::UpsertEdge {
                conn_type: "KNOWS".to_string(),
                src_type: "Person".to_string(),
                src_id: Value::Int64(1),
                tgt_type: "Person".to_string(),
                tgt_id: Value::Int64(2),
                properties: vec![("since".to_string(), Value::Int64(2020))],
            },
            MutationOp::RemoveNode {
                node_type: "Person".to_string(),
                id: Value::Int64(9),
            },
        ]
    }

    fn write_wal(frames: &[WalFrame]) -> Vec<u8> {
        let mut buf = Vec::new();
        write_header(&mut buf).unwrap();
        for f in frames {
            append_frame(&mut buf, f).unwrap();
        }
        buf
    }

    #[test]
    fn crc32_matches_known_vector() {
        // CRC32/IEEE of "123456789" is the standard check value.
        assert_eq!(crc32(b"123456789"), 0xCBF4_3926);
        assert_eq!(crc32(b""), 0);
    }

    #[test]
    fn single_frame_round_trips() {
        let frame = WalFrame {
            lsn: 1,
            ops: sample_ops(),
        };
        let bytes = write_wal(std::slice::from_ref(&frame));
        let got = read_frames(Cursor::new(bytes)).unwrap();
        assert_eq!(got, vec![frame]);
    }

    #[test]
    fn multiple_frames_preserve_order() {
        let frames = vec![
            WalFrame {
                lsn: 1,
                ops: vec![MutationOp::RemoveNode {
                    node_type: "T".into(),
                    id: Value::Int64(1),
                }],
            },
            WalFrame {
                lsn: 2,
                ops: sample_ops(),
            },
            WalFrame {
                lsn: 3,
                ops: vec![],
            },
        ];
        let bytes = write_wal(&frames);
        let got = read_frames(Cursor::new(bytes)).unwrap();
        assert_eq!(got, frames);
    }

    #[test]
    fn torn_trailing_frame_is_discarded() {
        let frames = vec![
            WalFrame {
                lsn: 1,
                ops: sample_ops(),
            },
            WalFrame {
                lsn: 2,
                ops: sample_ops(),
            },
        ];
        let mut bytes = write_wal(&frames);
        // Simulate a crash mid-append: lop off the last 5 bytes of the
        // final frame's payload.
        bytes.truncate(bytes.len() - 5);
        let got = read_frames(Cursor::new(bytes)).unwrap();
        // Only the first, fully-written frame survives.
        assert_eq!(got, vec![frames[0].clone()]);
    }

    #[test]
    fn truncated_in_length_prefix_is_clean_stop() {
        let frames = vec![WalFrame {
            lsn: 1,
            ops: sample_ops(),
        }];
        let mut bytes = write_wal(&frames);
        // Append a stray partial length prefix (2 of 4 bytes) — a crash
        // before even the length was fully written.
        bytes.extend_from_slice(&[0u8, 0u8]);
        let got = read_frames(Cursor::new(bytes)).unwrap();
        assert_eq!(got, frames);
    }

    #[test]
    fn corrupt_payload_crc_mismatch_stops() {
        let frame = WalFrame {
            lsn: 1,
            ops: sample_ops(),
        };
        let mut bytes = write_wal(std::slice::from_ref(&frame));
        // Flip a byte in the payload (after the 5-byte header + 8-byte
        // len/crc prefix) — CRC must catch it and drop the frame.
        let last = bytes.len() - 1;
        bytes[last] ^= 0xFF;
        let got = read_frames(Cursor::new(bytes)).unwrap();
        assert!(got.is_empty(), "corrupt frame must not be returned");
    }

    #[test]
    fn header_only_wal_yields_no_frames() {
        let bytes = write_wal(&[]);
        let got = read_frames(Cursor::new(bytes)).unwrap();
        assert!(got.is_empty());
    }

    #[test]
    fn bad_magic_is_rejected() {
        let bytes = b"XXXX\x01".to_vec();
        assert!(read_frames(Cursor::new(bytes)).is_err());
    }

    #[test]
    fn empty_reader_is_error() {
        let bytes: Vec<u8> = Vec::new();
        assert!(read_frames(Cursor::new(bytes)).is_err());
    }

    // ── file handle ──────────────────────────────────────────────────

    fn frame(lsn: u64) -> WalFrame {
        WalFrame {
            lsn,
            ops: sample_ops(),
        }
    }

    #[test]
    fn open_creates_with_header_and_appends_survive_reopen() {
        let dir = TempDir::new().unwrap();
        let p = dir.path().join("g.kgl-wal");
        {
            let mut wal = Wal::open(p.clone()).unwrap();
            wal.append(&frame(1)).unwrap();
            wal.append(&frame(2)).unwrap();
        } // drop closes the file
          // Reopen for append (must NOT clobber existing frames)...
        {
            let mut wal = Wal::open(p.clone()).unwrap();
            wal.append(&frame(3)).unwrap();
        }
        let frames = recover(&p).unwrap();
        assert_eq!(frames.iter().map(|f| f.lsn).collect::<Vec<_>>(), [1, 2, 3]);
    }

    #[test]
    fn reset_truncates_to_header_only() {
        let dir = TempDir::new().unwrap();
        let p = dir.path().join("g.kgl-wal");
        let mut wal = Wal::open(p.clone()).unwrap();
        wal.append(&frame(1)).unwrap();
        wal.append(&frame(2)).unwrap();
        wal.reset().unwrap();
        assert!(recover(&p).unwrap().is_empty());
        // Still usable after reset.
        wal.append(&frame(5)).unwrap();
        assert_eq!(
            recover(&p)
                .unwrap()
                .iter()
                .map(|f| f.lsn)
                .collect::<Vec<_>>(),
            [5]
        );
    }

    #[test]
    fn recover_missing_file_is_empty() {
        let dir = TempDir::new().unwrap();
        let p = dir.path().join("does-not-exist.kgl-wal");
        assert!(recover(&p).unwrap().is_empty());
    }

    #[test]
    fn wal_path_appends_suffix() {
        assert_eq!(
            wal_path(Path::new("/data/graph.kgl")),
            PathBuf::from("/data/graph.kgl-wal")
        );
    }
}