picante 2.0.0

An async incremental query runtime
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
//! Write-ahead log (WAL) for incremental cache persistence.
//!
//! This module provides append-only persistence that writes only changes
//! instead of rewriting the entire cache. The WAL builds on top of periodic
//! snapshots created by the regular `save_cache` function.
//!
//! # Architecture
//!
//! - **Base snapshot**: Created via `save_cache`, contains full cache state at a revision
//! - **WAL entries**: Appended after snapshot, records changes since base revision
//! - **Compaction**: Periodically create new snapshot and discard old WAL
//!
//! # Limitations
//!
//! **Interned ingredients**: Interned values are NOT included in incremental WAL entries.
//! They are only persisted in base snapshots. This is because interned values are typically
//! small and infrequently changing, so the overhead of incremental tracking doesn't justify
//! the complexity. If you're using interned ingredients, be aware that new intern operations
//! between snapshots will not be persisted to the WAL.
//!
//! # Usage
//!
//! ```rust,ignore
//! use picante::persist::{save_cache, load_cache, append_to_wal, replay_wal, compact_wal};
//! use picante::wal::WalWriter;
//!
//! // Initial setup: create a base snapshot
//! save_cache("cache.bin", &runtime, &ingredients).await?;
//! let base_revision = runtime.current_revision().0;
//!
//! // Create a WAL file associated with that snapshot
//! let mut wal = WalWriter::create("cache.wal", base_revision)?;
//!
//! // As you make changes to the database, periodically append to the WAL
//! // (typically after a batch of operations or at regular intervals)
//! append_to_wal(&mut wal, &runtime, &ingredients).await?;
//!
//! // On next startup, load the snapshot and then replay the WAL
//! load_cache("cache.bin", &runtime, &ingredients).await?;
//! replay_wal("cache.wal", &runtime, &ingredients).await?;
//!
//! // When the WAL grows too large, compact it by creating a new snapshot
//! compact_wal("cache.bin", "cache.wal", &runtime, &ingredients, &options, true).await?;
//! ```

use crate::{PicanteError, PicanteResult};
use facet::Facet;
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;

/// Format version for the WAL file format.
/// Increment when making breaking changes to the format.
const WAL_FORMAT_VERSION: u32 = 1;

/// Magic bytes at the start of WAL files for validation.
const WAL_MAGIC: &[u8; 8] = b"PICANTE\0";

/// Header written at the start of every WAL file.
#[derive(Debug, Clone, Facet)]
pub struct WalHeader {
    /// Format version for compatibility checking
    pub format_version: u32,
    /// The revision of the base snapshot this WAL builds upon
    pub base_revision: u64,
}

/// A single entry in the write-ahead log.
#[derive(Debug, Clone, Facet)]
pub struct WalEntry {
    /// The revision when this change occurred
    pub revision: u64,
    /// The ingredient kind that owns this entry
    pub kind_id: u32,
    /// The operation performed
    pub operation: WalOperation,
}

/// Operations that can be recorded in the WAL.
#[repr(u8)]
#[derive(Debug, Clone, Facet)]
pub enum WalOperation {
    /// Insert or update a key-value pair
    Set {
        /// Serialized key
        key: Vec<u8>,
        /// Serialized value
        value: Vec<u8>,
    },
    /// Delete a key
    Delete {
        /// Serialized key
        key: Vec<u8>,
    },
}

/// Writer for append-only WAL operations.
///
/// Buffers writes in memory and flushes periodically for performance.
pub struct WalWriter {
    path: PathBuf,
    writer: BufWriter<File>,
    base_revision: u64,
    entries_since_flush: usize,
    /// Flush after this many entries (default: 100)
    pub auto_flush_threshold: usize,
}

impl WalWriter {
    /// Default auto-flush threshold (number of entries before auto-flush).
    ///
    /// This value of 100 balances write performance with data durability.
    /// Lower values increase durability but may reduce write throughput.
    /// Higher values improve performance but increase risk of data loss on crash.
    pub const DEFAULT_AUTO_FLUSH_THRESHOLD: usize = 100;

    /// Create a new WAL file with default settings.
    ///
    /// Uses the default auto-flush threshold. For custom settings, use
    /// `create_with_threshold()`.
    ///
    /// If a file already exists at this path, it will be truncated.
    pub fn create(path: impl AsRef<Path>, base_revision: u64) -> PicanteResult<Self> {
        Self::create_with_threshold(path, base_revision, Self::DEFAULT_AUTO_FLUSH_THRESHOLD)
    }

    /// Create a new WAL file with a custom auto-flush threshold.
    ///
    /// The `auto_flush_threshold` determines how many entries are buffered
    /// before automatically flushing to disk:
    ///
    /// - **Lower values (e.g., 10-50)**: Better durability, less data loss on crash,
    ///   but more disk I/O overhead
    /// - **Higher values (e.g., 200-1000)**: Better write performance, but more
    ///   entries could be lost if the process crashes before flush
    ///
    /// If a file already exists at this path, it will be truncated.
    pub fn create_with_threshold(
        path: impl AsRef<Path>,
        base_revision: u64,
        auto_flush_threshold: usize,
    ) -> PicanteResult<Self> {
        let path = path.as_ref().to_path_buf();

        let file = OpenOptions::new()
            .write(true)
            .create(true)
            .truncate(true)
            .open(&path)
            .map_err(|e| {
                Arc::new(PicanteError::Cache {
                    message: format!("Failed to create WAL file at {}: {}", path.display(), e),
                })
            })?;

        let mut writer = BufWriter::new(file);

        // Write magic bytes
        writer.write_all(WAL_MAGIC).map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to write WAL magic bytes: {}", e),
            })
        })?;

        // Write header
        let header = WalHeader {
            format_version: WAL_FORMAT_VERSION,
            base_revision,
        };
        let header_bytes = facet_postcard::to_vec(&header).map_err(|e| {
            Arc::new(PicanteError::Encode {
                what: "WAL header",
                message: format!("{}", e),
            })
        })?;

        // Write header length as u32, then header
        let header_len = header_bytes.len() as u32;
        writer.write_all(&header_len.to_le_bytes()).map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to write WAL header length: {}", e),
            })
        })?;
        writer.write_all(&header_bytes).map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to write WAL header: {}", e),
            })
        })?;

        writer.flush().map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to flush WAL header: {}", e),
            })
        })?;

        Ok(Self {
            path,
            writer,
            base_revision,
            entries_since_flush: 0,
            auto_flush_threshold,
        })
    }

    /// Append a WAL entry to the log.
    ///
    /// The entry is buffered and will be flushed when `auto_flush_threshold`
    /// is reached or when `flush()` is called explicitly.
    pub fn append(&mut self, entry: WalEntry) -> PicanteResult<()> {
        // Serialize the entry
        let entry_bytes = facet_postcard::to_vec(&entry).map_err(|e| {
            Arc::new(PicanteError::Encode {
                what: "WAL entry",
                message: format!("{}", e),
            })
        })?;

        // Write entry length as u32, then entry
        let entry_len = entry_bytes.len() as u32;
        self.writer
            .write_all(&entry_len.to_le_bytes())
            .map_err(|e| {
                Arc::new(PicanteError::Cache {
                    message: format!("Failed to write WAL entry length: {}", e),
                })
            })?;
        self.writer.write_all(&entry_bytes).map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to write WAL entry: {}", e),
            })
        })?;

        self.entries_since_flush += 1;

        // Auto-flush if threshold reached
        if self.entries_since_flush >= self.auto_flush_threshold {
            self.flush()?;
        }

        Ok(())
    }

    /// Flush buffered entries to disk.
    pub fn flush(&mut self) -> PicanteResult<()> {
        self.writer.flush().map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to flush WAL: {}", e),
            })
        })?;
        self.entries_since_flush = 0;
        Ok(())
    }

    /// Get the base revision this WAL builds upon.
    pub fn base_revision(&self) -> u64 {
        self.base_revision
    }

    /// Get the path to the WAL file.
    pub fn path(&self) -> &Path {
        &self.path
    }
}

impl Drop for WalWriter {
    fn drop(&mut self) {
        // Best-effort flush on drop
        if let Err(e) = self.flush() {
            tracing::warn!(
                path = %self.path.display(),
                error = %e,
                "Failed to flush WAL on drop - unflushed entries may be lost"
            );
        }
    }
}

/// Reader for replaying WAL entries.
#[derive(Debug)]
pub struct WalReader {
    #[allow(dead_code)] // Kept for future diagnostics
    path: PathBuf,
    reader: BufReader<File>,
    header: WalHeader,
}

impl WalReader {
    /// Open an existing WAL file for reading.
    pub fn open(path: impl AsRef<Path>) -> PicanteResult<Self> {
        let path = path.as_ref().to_path_buf();

        let file = File::open(&path).map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to open WAL file at {}: {}", path.display(), e),
            })
        })?;

        let mut reader = BufReader::new(file);

        // Read and validate magic bytes
        let mut magic = [0u8; 8];
        reader.read_exact(&mut magic).map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to read WAL magic bytes: {}", e),
            })
        })?;

        if &magic != WAL_MAGIC {
            return Err(Arc::new(PicanteError::Cache {
                message: format!(
                    "Invalid WAL magic bytes (expected {:?}, got {:?})",
                    WAL_MAGIC, magic
                ),
            }));
        }

        // Read header length
        let mut header_len_bytes = [0u8; 4];
        reader.read_exact(&mut header_len_bytes).map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to read WAL header length: {}", e),
            })
        })?;
        let header_len = u32::from_le_bytes(header_len_bytes) as usize;

        // Sanity check: header should not be larger than 1 MB
        const MAX_HEADER_LEN: usize = 1_000_000;
        if header_len > MAX_HEADER_LEN {
            return Err(Arc::new(PicanteError::Cache {
                message: format!(
                    "WAL header length ({} bytes) exceeds maximum allowed ({} bytes) - file may be corrupted",
                    header_len, MAX_HEADER_LEN
                ),
            }));
        }

        // Read header
        let mut header_bytes = vec![0u8; header_len];
        reader.read_exact(&mut header_bytes).map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to read WAL header: {}", e),
            })
        })?;

        let header: WalHeader = facet_postcard::from_slice(&header_bytes).map_err(|e| {
            Arc::new(PicanteError::Decode {
                what: "WAL header",
                message: format!("{}", e),
            })
        })?;

        // Validate format version
        if header.format_version != WAL_FORMAT_VERSION {
            return Err(Arc::new(PicanteError::Cache {
                message: format!(
                    "Unsupported WAL format version (expected {}, got {})",
                    WAL_FORMAT_VERSION, header.format_version
                ),
            }));
        }

        Ok(Self {
            path,
            reader,
            header,
        })
    }

    /// Get the header information.
    pub fn header(&self) -> &WalHeader {
        &self.header
    }

    /// Read the next entry from the WAL.
    ///
    /// Returns `Ok(None)` when EOF is reached.
    pub fn next_entry(&mut self) -> PicanteResult<Option<WalEntry>> {
        // Try to read entry length
        let mut entry_len_bytes = [0u8; 4];
        match self.reader.read_exact(&mut entry_len_bytes) {
            Ok(_) => {}
            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
                // Reached end of file
                return Ok(None);
            }
            Err(e) => {
                return Err(Arc::new(PicanteError::Cache {
                    message: format!("Failed to read WAL entry length: {}", e),
                }));
            }
        }

        let entry_len = u32::from_le_bytes(entry_len_bytes) as usize;

        // Sanity check: entry should not be larger than 100 MB
        // (A reasonable upper bound for a single cache entry)
        const MAX_ENTRY_LEN: usize = 100_000_000;
        if entry_len > MAX_ENTRY_LEN {
            return Err(Arc::new(PicanteError::Cache {
                message: format!(
                    "WAL entry length ({} bytes) exceeds maximum allowed ({} bytes) - file may be corrupted",
                    entry_len, MAX_ENTRY_LEN
                ),
            }));
        }

        // Read entry
        let mut entry_bytes = vec![0u8; entry_len];
        self.reader.read_exact(&mut entry_bytes).map_err(|e| {
            Arc::new(PicanteError::Cache {
                message: format!("Failed to read WAL entry: {}", e),
            })
        })?;

        let entry: WalEntry = facet_postcard::from_slice(&entry_bytes).map_err(|e| {
            Arc::new(PicanteError::Decode {
                what: "WAL entry",
                message: format!("{}", e),
            })
        })?;

        Ok(Some(entry))
    }

    /// Iterate over all entries in the WAL.
    pub fn entries(&mut self) -> WalEntryIterator<'_> {
        WalEntryIterator { reader: self }
    }
}

/// Iterator over WAL entries.
pub struct WalEntryIterator<'a> {
    reader: &'a mut WalReader,
}

impl<'a> Iterator for WalEntryIterator<'a> {
    type Item = PicanteResult<WalEntry>;

    fn next(&mut self) -> Option<Self::Item> {
        match self.reader.next_entry() {
            Ok(Some(entry)) => Some(Ok(entry)),
            Ok(None) => None,
            Err(e) => Some(Err(e)),
        }
    }
}

// Tests will be added in a separate test file that has access to tempfile