merutable 0.0.1

Embeddable single-table engine: row + columnar Parquet with Iceberg-compatible metadata
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
//! Read path: point lookup (3-stop) and range scan via K-way merge.
//!
//! ## Point lookup algorithm
//!
//! 1. **Memtable** (active → immutable queue, newest first)
//!    → return immediately if found.
//! 2. **L0 files** (ALL checked, sorted by `seq_max` DESC — can overlap)
//!    → bloom filter gate first; first hit wins (files are already sorted
//!    newest-first by `Manifest::to_version`).
//! 3. **L1..LN** (binary search per level by `key_max` — non-overlapping)
//!    → bloom filter gate first.
//!
//! Deletion Vectors are loaded from their Puffin blob at the offset/length
//! recorded in the manifest and passed through to `ParquetReader::get`.
//!
//! ## Range scan
//!
//! Collect sorted rows from every memtable and every live Parquet file,
//! then do a single-pass dedup by user_key (PK ASC, seq DESC). A tombstone
//! at the top of a user_key group drops the key. File-level DVs filter
//! physically-deleted rows before dedup.

use std::{path::Path, sync::Arc};

use crate::iceberg::{version::DataFileMeta, DeletionVector};
use crate::memtable::iterator::MemEntry;
use crate::parquet::reader::ParquetReader;
use crate::types::{
    key::InternalKey,
    level::Level,
    schema::TableSchema,
    sequence::{OpType, SeqNum},
    value::{FieldValue, Row},
    MeruError, Result,
};
use bytes::Bytes;
use roaring::RoaringBitmap;
use tracing::{debug, instrument, trace};

use crate::engine::engine::MeruEngine;

// ── File open helper ────────────────────────────────────────────────────────

/// Synchronously open a Parquet file from disk and, when the manifest
/// records a Deletion Vector, load the exact DV blob byte range from the
/// companion Puffin file.
///
/// The returned `RoaringBitmap` is the set of **file-global** row positions
/// that were logically deleted by a subsequent partial compaction. Readers
/// MUST pass it to `ParquetReader::get`/`scan` or else deleted rows will
/// silently resurrect on read.
fn open_file(
    base: &Path,
    file: &DataFileMeta,
    schema: Arc<TableSchema>,
) -> Result<(ParquetReader<Bytes>, Option<RoaringBitmap>)> {
    let abs_parquet = base.join(&file.path);
    let parquet_bytes = std::fs::read(&abs_parquet).map_err(MeruError::Io)?;
    let reader = ParquetReader::open(Bytes::from(parquet_bytes), schema)?;

    let dv = match (&file.dv_path, file.dv_offset, file.dv_length) {
        (Some(dv_path), Some(offset), Some(length)) => {
            let abs_dv = base.join(dv_path);
            let puffin_bytes = std::fs::read(&abs_dv).map_err(MeruError::Io)?;
            let start = offset as usize;
            let end = start
                .checked_add(length as usize)
                .ok_or_else(|| MeruError::Corruption("DV offset+length overflow".into()))?;
            if end > puffin_bytes.len() {
                return Err(MeruError::Corruption(format!(
                    "DV blob out of range: path={dv_path} offset={offset} length={length} puffin_len={}",
                    puffin_bytes.len()
                )));
            }
            let dv = DeletionVector::from_puffin_blob(&puffin_bytes[start..end])?;
            Some(dv.bitmap().clone())
        }
        (None, None, None) => None,
        _ => {
            return Err(MeruError::Corruption(format!(
                "inconsistent DV coords on file {}: dv_path={:?} dv_offset={:?} dv_length={:?}",
                file.path, file.dv_path, file.dv_offset, file.dv_length
            )));
        }
    };

    Ok((reader, dv))
}

// ── Point lookup ─────────────────────────────────────────────────────────────

/// 3-stop point lookup: memtable → L0 → L1..LN.
#[instrument(skip(engine), fields(op = "point_lookup"))]
pub fn point_lookup(engine: &MeruEngine, pk_values: &[FieldValue]) -> Result<Option<Row>> {
    let read_seq = engine.read_seq();

    // Encode user key bytes for the lookup.
    let ikey = InternalKey::encode(pk_values, read_seq, OpType::Put, &engine.schema)?;
    let user_key_bytes = ikey.user_key_bytes().to_vec();

    // Stop 1: Memtable.
    if let Some(entry) = engine.memtable.get(&user_key_bytes, read_seq) {
        if entry.op_type == OpType::Delete {
            return Ok(None); // tombstone
        }
        // Issue #12: decode errors are Corruption, not silent defaults.
        let row = crate::engine::codec::decode_row(&entry.value)?;
        trace!(source = "memtable", "cache hit");
        return Ok(Some(row));
    }

    // Stop 1.5: Row cache (between memtable and file I/O).
    if let Some(ref cache) = engine.row_cache {
        if let Some(entry) = cache.get(&user_key_bytes) {
            if entry.op_type == OpType::Delete {
                return Ok(None);
            }
            trace!(source = "row_cache", "cache hit");
            return Ok(Some(entry.row));
        }
    }

    // Cache race fix: snapshot the generation BEFORE reading from disk.
    // Any concurrent write that invalidates the cache advances the
    // generation; the `insert_if_fresh` call below refuses to install
    // the disk-sourced value if the generation has moved on — preventing
    // a stale-cache-survives-memtable-flush scenario.
    let cache_gen = engine.row_cache.as_ref().map(|c| c.snapshot_generation());

    // Pin the current version snapshot: GC will not delete any file
    // our `version` still references until `_pin` drops at function
    // return. Fixes BUG-0007..0013 where long integrity reads hit
    // `IO NotFound` because GC ran mid-read. The guard owns a clone
    // of the Version `Arc` so the caller uses a stable snapshot for
    // every file opened below.
    let (_pin, version) = engine.pin_current_snapshot();
    let base = engine.catalog.base_path();

    // Stop 2: L0 files. `Manifest::to_version` pre-sorts L0 by `seq_max`
    // DESC so the first file that returns a hit is guaranteed to carry the
    // newest visible version of `user_key_bytes`.
    for file in version.files_at(Level(0)) {
        if !range_contains(&file.meta.key_min, &file.meta.key_max, &user_key_bytes) {
            continue;
        }
        let (reader, dv) = open_file(base, file, engine.schema.clone())?;
        if let Some((hit_ikey, row)) = reader.get(&user_key_bytes, read_seq, dv.as_ref())? {
            // Populate cache before returning — only if no concurrent
            // invalidation raced with this read.
            if let (Some(ref cache), Some(gen)) = (&engine.row_cache, cache_gen) {
                cache.insert_if_fresh(
                    user_key_bytes.clone(),
                    crate::engine::cache::CacheEntry {
                        op_type: hit_ikey.op_type,
                        row: row.clone(),
                    },
                    gen,
                );
            }
            if hit_ikey.op_type == OpType::Delete {
                return Ok(None);
            }
            debug!(source = "L0", file = %file.path, "point lookup hit");
            return Ok(Some(row));
        }
    }

    // Stop 3: L1..LN — binary search for the covering file per level.
    let max_level = version.max_level();
    for lvl in 1..=max_level.0 {
        let level = Level(lvl);
        let Some(file) = version.find_file_for_key(level, &user_key_bytes) else {
            continue;
        };
        let (reader, dv) = open_file(base, file, engine.schema.clone())?;
        if let Some((hit_ikey, row)) = reader.get(&user_key_bytes, read_seq, dv.as_ref())? {
            if let (Some(ref cache), Some(gen)) = (&engine.row_cache, cache_gen) {
                cache.insert_if_fresh(
                    user_key_bytes.clone(),
                    crate::engine::cache::CacheEntry {
                        op_type: hit_ikey.op_type,
                        row: row.clone(),
                    },
                    gen,
                );
            }
            if hit_ikey.op_type == OpType::Delete {
                return Ok(None);
            }
            debug!(source = %format!("L{}", lvl), file = %file.path, "point lookup hit");
            return Ok(Some(row));
        }
    }

    trace!("point lookup miss");
    Ok(None)
}

/// Issue #29 Phase 2c: point-lookup that takes a pre-encoded
/// user_key + explicit read_seq. Used by the change-feed pre-image
/// path to resolve "what did the row look like at `delete_seq - 1`"
/// without redoing the PK encoding (the change feed already carries
/// `pk_bytes` from the op record).
///
/// Returns:
/// - `Some(row)` if the key was live (last op `<= read_seq` was a Put).
/// - `None` if the key was absent or tombstoned at `read_seq`.
///
/// Does NOT update the row cache — pre-image lookups happen during
/// change-feed draining and shouldn't perturb the steady-state hit
/// pattern that `point_lookup` caches against.
pub fn point_lookup_at_seq(
    engine: &MeruEngine,
    user_key_bytes: &[u8],
    read_seq: SeqNum,
) -> Result<Option<Row>> {
    // Stop 1: Memtable.
    if let Some(entry) = engine.memtable.get(user_key_bytes, read_seq) {
        if entry.op_type == OpType::Delete {
            return Ok(None);
        }
        return Ok(Some(crate::engine::codec::decode_row(&entry.value)?));
    }

    // Pin the version snapshot so GC doesn't delete files out from
    // under us mid-read.
    let (_pin, version) = engine.pin_current_snapshot();
    let base = engine.catalog.base_path();

    // Stop 2: L0 files (sorted by seq_max DESC — newest first).
    for file in version.files_at(Level(0)) {
        if !range_contains(&file.meta.key_min, &file.meta.key_max, user_key_bytes) {
            continue;
        }
        let (reader, dv) = open_file(base, file, engine.schema.clone())?;
        if let Some((hit_ikey, row)) = reader.get(user_key_bytes, read_seq, dv.as_ref())? {
            if hit_ikey.op_type == OpType::Delete {
                return Ok(None);
            }
            return Ok(Some(row));
        }
    }

    // Stop 3: L1..LN (non-overlapping, binary search per level).
    let max_level = version.max_level();
    for lvl in 1..=max_level.0 {
        let level = Level(lvl);
        let Some(file) = version.find_file_for_key(level, user_key_bytes) else {
            continue;
        };
        let (reader, dv) = open_file(base, file, engine.schema.clone())?;
        if let Some((hit_ikey, row)) = reader.get(user_key_bytes, read_seq, dv.as_ref())? {
            if hit_ikey.op_type == OpType::Delete {
                return Ok(None);
            }
            return Ok(Some(row));
        }
    }
    Ok(None)
}

fn range_contains(key_min: &[u8], key_max: &[u8], probe: &[u8]) -> bool {
    if !key_min.is_empty() && probe < key_min {
        return false;
    }
    if !key_max.is_empty() && probe > key_max {
        return false;
    }
    true
}

// ── Range scan ───────────────────────────────────────────────────────────────

/// Range scan with K-way merge across memtables and every live Parquet file.
/// Dedups by `user_key`, drops tombstones, and honors Deletion Vectors.
#[instrument(skip(engine), fields(op = "range_scan"))]
pub fn range_scan(
    engine: &MeruEngine,
    start_pk: Option<&[FieldValue]>,
    end_pk: Option<&[FieldValue]>,
) -> Result<Vec<(InternalKey, Row)>> {
    // Issue #37 fix: pin BEFORE the memtable harvest. Pre-#37 this
    // pin happened after the encode + memtable-decode work (tens of
    // ms under heavy load), leaving a TOCTOU window where a
    // concurrent compaction-GC could delete a Parquet file still
    // referenced by the `Version` this scan was about to open.
    // Pinning at the top of the function — before ANY work that
    // might depend on the Version — closes the window: GC sees our
    // pin on its first `min_pinned_snapshot()` read after we
    // register, and the pinned `Version`'s files are guaranteed to
    // remain on disk until `_pin` drops at function return.
    let (_pin, version) = engine.pin_current_snapshot();
    let read_seq = engine.read_seq();

    // Encode start/end user key bytes.
    let start_bytes = start_pk
        .map(|pk| {
            InternalKey::encode(pk, read_seq, OpType::Put, &engine.schema)
                .map(|ik| ik.user_key_bytes().to_vec())
        })
        .transpose()?;
    let end_bytes = end_pk
        .map(|pk| {
            InternalKey::encode(pk, read_seq, OpType::Put, &engine.schema)
                .map(|ik| ik.user_key_bytes().to_vec())
        })
        .transpose()?;

    // Harvest every candidate `(InternalKey, Row, op_type)` tuple into a
    // single buffer. We do a single sort+dedup pass at the end rather than
    // an incremental k-way merge: simpler to get right, still O(N log N),
    // and N is bounded by the active working set.
    let mut harvest: Vec<(InternalKey, Row, OpType)> = Vec::new();

    // 1. Memtable snapshots.
    let mem_snapshots = engine.memtable.snapshot_entries(read_seq);
    let mut mem_all: Vec<MemEntry> = Vec::new();
    for s in mem_snapshots {
        mem_all.extend(s);
    }
    for entry in &mem_all {
        // Range gate — skip rows outside the requested range early.
        let uk = entry.user_key.as_ref();
        if let Some(ref start) = start_bytes {
            if uk < start.as_slice() {
                continue;
            }
        }
        if let Some(ref end) = end_bytes {
            if uk >= end.as_slice() {
                continue;
            }
        }

        // Rebuild the InternalKey from wire bytes (user_key ++ tag).
        let tag = (crate::types::sequence::SEQNUM_MAX.0 - entry.seq.0) << 8
            | (entry.entry.op_type as u64);
        let mut wire = Vec::with_capacity(uk.len() + 8);
        wire.extend_from_slice(uk);
        wire.extend_from_slice(&tag.to_be_bytes());
        let ikey = InternalKey::decode(&wire, &engine.schema)?;

        // Issue #12: decode errors surface — the scan aborts rather
        // than silently including an empty phantom row.
        let row: Row = if entry.entry.op_type == OpType::Put && !entry.entry.value.is_empty() {
            crate::engine::codec::decode_row(&entry.entry.value)?
        } else {
            Row::default()
        };
        harvest.push((ikey, row, entry.entry.op_type));
    }

    // 2. Every live Parquet file at every level. `version` and
    // `_pin` were acquired at the top of the function (#37 fix);
    // GC cannot delete any file the pinned `version` references
    // until we return and `_pin` drops.
    let base = engine.catalog.base_path();
    let max_level = version.max_level();
    for lvl in 0..=max_level.0 {
        let level = Level(lvl);
        for file in version.files_at(level) {
            // Skip files whose key range doesn't overlap the scan range.
            if let Some(ref start) = start_bytes {
                if !file.meta.key_max.is_empty() && file.meta.key_max.as_slice() < start.as_slice()
                {
                    continue;
                }
            }
            if let Some(ref end) = end_bytes {
                if !file.meta.key_min.is_empty() && file.meta.key_min.as_slice() >= end.as_slice() {
                    continue;
                }
            }

            let (reader, dv) = open_file(base, file, engine.schema.clone())?;
            // Ask the reader for rows in the requested range, already
            // DV-filtered and MVCC-gated at `read_seq`. `scan` dedups
            // within a single file; cross-file dedup happens below.
            let file_rows = reader.scan(
                start_bytes.as_deref(),
                end_bytes.as_deref(),
                read_seq,
                dv.as_ref(),
            )?;
            for (ikey, row) in file_rows {
                let op = ikey.op_type;
                harvest.push((ikey, row, op));
            }
        }
    }

    // 3. Global sort: (user_key ASC, seq DESC).
    harvest.sort_by(|a, b| a.0.cmp(&b.0));

    // 4. Dedup: for each user_key, keep the topmost entry (highest seq).
    // Drop keys whose topmost entry is a tombstone.
    let mut results: Vec<(InternalKey, Row)> = Vec::new();
    let mut last_uk: Option<Vec<u8>> = None;

    for (ikey, row, op) in harvest {
        let uk = ikey.user_key_bytes().to_vec();
        if let Some(ref last) = last_uk {
            if *last == uk {
                continue; // older version of same key
            }
        }
        last_uk = Some(uk);

        if op == OpType::Delete {
            continue;
        }
        results.push((ikey, row));
    }

    debug!(result_count = results.len(), "range scan complete");
    Ok(results)
}