Skip to main content

nodedb_columnar/writer/
segment_writer.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! `SegmentWriter` and codec selection for compressed columnar segments.
4
5use std::sync::Arc;
6
7use nodedb_codec::{ColumnCodec, ColumnTypeHint, ResolvedColumnCodec};
8use nodedb_mem::{EngineId, MemoryGovernor};
9use nodedb_types::columnar::{ColumnType, ColumnarSchema};
10
11use crate::error::ColumnarError;
12use crate::format::{ColumnMeta, HEADER_SIZE, SegmentFooter, SegmentHeader};
13use crate::memtable::ColumnData;
14
15use super::block::encode_column_blocks;
16use super::encode::compute_schema_hash;
17
18/// Profile tag values for the segment footer.
19pub const PROFILE_PLAIN: u8 = 0;
20pub const PROFILE_TIMESERIES: u8 = 1;
21pub const PROFILE_SPATIAL: u8 = 2;
22
23/// Writes a drained memtable into a complete segment byte buffer.
24///
25/// The segment is self-contained: header identifies the format, column
26/// blocks store compressed data, and the footer enables random access to
27/// any column without scanning the entire file.
28pub struct SegmentWriter {
29    profile_tag: u8,
30    /// Optional memory governor for tracking working-buffer allocations.
31    /// `None` in embedded (Lite) deployments that do not configure a governor.
32    governor: Option<Arc<MemoryGovernor>>,
33}
34
35impl SegmentWriter {
36    /// Create a writer for the given profile without a memory governor.
37    pub fn new(profile_tag: u8) -> Self {
38        Self {
39            profile_tag,
40            governor: None,
41        }
42    }
43
44    /// Create a writer for the given profile with a memory governor.
45    pub fn with_governor(profile_tag: u8, governor: Arc<MemoryGovernor>) -> Self {
46        Self {
47            profile_tag,
48            governor: Some(governor),
49        }
50    }
51
52    /// Create a writer for the plain (default) profile.
53    pub fn plain() -> Self {
54        Self::new(PROFILE_PLAIN)
55    }
56
57    /// Encode a drained memtable into a segment byte buffer.
58    ///
59    /// `schema` is the column schema, `columns` are the drained column data,
60    /// `row_count` is the total number of rows.
61    ///
62    /// When `kek` is `Some`, the assembled plaintext segment is wrapped in an
63    /// AES-256-GCM encrypted `SEGC` envelope before being returned. When
64    /// `None`, the raw `NDBS` segment bytes are returned.
65    pub fn write_segment(
66        &self,
67        schema: &ColumnarSchema,
68        columns: &[ColumnData],
69        row_count: usize,
70        kek: Option<&nodedb_wal::crypto::WalEncryptionKey>,
71    ) -> Result<Vec<u8>, ColumnarError> {
72        if row_count == 0 {
73            return Err(ColumnarError::EmptyMemtable);
74        }
75        if columns.len() != schema.columns.len() {
76            return Err(ColumnarError::SchemaMismatch {
77                expected: schema.columns.len(),
78                got: columns.len(),
79            });
80        }
81
82        let mut buf = Vec::new();
83
84        // 1. Write header.
85        buf.extend_from_slice(&SegmentHeader::current().to_bytes());
86
87        // 2. Encode each column's blocks.
88        let _metas_guard = self
89            .governor
90            .as_ref()
91            .map(|g| {
92                g.reserve(
93                    EngineId::Columnar,
94                    columns.len() * std::mem::size_of::<ColumnMeta>(),
95                )
96            })
97            .transpose()?;
98        // no-governor: governed by _metas_guard above; multi-line reserve call splits outside 5-line gate window
99        let mut column_metas = Vec::with_capacity(columns.len());
100
101        for (i, (col_def, col_data)) in schema.columns.iter().zip(columns.iter()).enumerate() {
102            let col_start = buf.len() as u64;
103
104            // Select codec for this column type.
105            let codec = select_codec_for_profile(&col_def.column_type, self.profile_tag);
106
107            // Encode blocks.
108            let block_stats = encode_column_blocks(
109                &mut buf,
110                col_data,
111                &col_def.column_type,
112                codec,
113                row_count,
114                self.governor.as_ref(),
115            )?;
116
117            let col_end = buf.len() as u64;
118
119            // For DictEncoded columns, the codec stored in meta is DeltaFastLanesLz4 (IDs),
120            // and the dictionary strings are stored in the meta for reader reconstruction.
121            let (effective_codec, dictionary) = match col_data {
122                ColumnData::DictEncoded { dictionary, .. } => (
123                    ResolvedColumnCodec::DeltaFastLanesLz4,
124                    Some(dictionary.clone()),
125                ),
126                _ => (codec, None),
127            };
128
129            column_metas.push(ColumnMeta {
130                name: col_def.name.clone(),
131                offset: col_start - HEADER_SIZE as u64,
132                length: col_end - col_start,
133                codec: effective_codec,
134                block_count: block_stats.len() as u32,
135                block_stats,
136                dictionary,
137            });
138
139            let _ = i; // Satisfy clippy about unused index.
140        }
141
142        // 3. Compute schema hash (simple hash of column names + types).
143        let schema_hash = compute_schema_hash(schema);
144
145        // 4. Write footer.
146        let footer = SegmentFooter {
147            schema_hash,
148            column_count: schema.columns.len() as u32,
149            row_count: row_count as u64,
150            profile_tag: self.profile_tag,
151            columns: column_metas,
152        };
153        let footer_bytes = footer.to_bytes()?;
154        buf.extend_from_slice(&footer_bytes);
155
156        // Optionally wrap the plaintext segment in an AES-256-GCM SEGC envelope.
157        if let Some(key) = kek {
158            return crate::encrypt::encrypt_segment(key, &buf);
159        }
160
161        Ok(buf)
162    }
163}
164
165/// Select the best codec for a column type, with profile-aware overrides.
166///
167/// For timeseries profiles (tag=1), Float64 metric columns use Gorilla XOR
168/// encoding when the data is monotonic/slowly-changing. For other profiles,
169/// the standard auto-detection pipeline applies.
170///
171/// Always returns a `ResolvedColumnCodec` — `Auto` is never returned.
172pub fn select_codec_for_profile(col_type: &ColumnType, profile_tag: u8) -> ResolvedColumnCodec {
173    // Timeseries profile: prefer Gorilla for Float64 metrics.
174    if profile_tag == PROFILE_TIMESERIES && matches!(col_type, ColumnType::Float64) {
175        return ResolvedColumnCodec::Gorilla;
176    }
177    // Timeseries profile: delta-of-delta for timestamps (both naive and tz-aware).
178    if profile_tag == PROFILE_TIMESERIES
179        && matches!(col_type, ColumnType::Timestamp | ColumnType::Timestamptz)
180    {
181        return ResolvedColumnCodec::DeltaFastLanesLz4;
182    }
183    select_codec(col_type)
184}
185
186/// Select the best codec for a column type using nodedb-codec's auto-detection.
187///
188/// Always returns a `ResolvedColumnCodec` — `Auto` is consumed here and
189/// never forwarded downstream.
190fn select_codec(col_type: &ColumnType) -> ResolvedColumnCodec {
191    let hint = match col_type {
192        ColumnType::Int64 => ColumnTypeHint::Int64,
193        ColumnType::Float64 => ColumnTypeHint::Float64,
194        ColumnType::Timestamp | ColumnType::Timestamptz | ColumnType::SystemTimestamp => {
195            ColumnTypeHint::Timestamp
196        }
197        ColumnType::String | ColumnType::Geometry | ColumnType::Regex => ColumnTypeHint::String,
198        ColumnType::Bool
199        | ColumnType::Bytes
200        | ColumnType::Decimal { .. }
201        | ColumnType::Uuid
202        | ColumnType::Ulid
203        | ColumnType::Json
204        | ColumnType::Array
205        | ColumnType::Set
206        | ColumnType::Range
207        | ColumnType::Record => {
208            return ResolvedColumnCodec::Lz4;
209        }
210        ColumnType::Duration => ColumnTypeHint::Int64, // i64 microseconds
211        ColumnType::Vector(_) => {
212            return ResolvedColumnCodec::Lz4;
213        }
214        // ColumnType is #[non_exhaustive]; unknown types default to Lz4
215        // general-purpose compression until a dedicated codec is registered.
216        _ => {
217            return ResolvedColumnCodec::Lz4;
218        }
219    };
220    // detect_codec resolves Auto via the type hint and always returns a
221    // concrete codec. Map any unexpected Auto back to a safe default
222    // (Lz4) rather than panicking in library code.
223    nodedb_codec::detect_codec(ColumnCodec::Auto, hint)
224        .try_resolve()
225        .unwrap_or(ResolvedColumnCodec::Lz4)
226}
227
228#[cfg(test)]
229mod tests {
230    use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
231    use nodedb_types::value::Value;
232
233    use super::*;
234    use crate::format::{SegmentFooter, SegmentHeader};
235    use crate::memtable::ColumnarMemtable;
236
237    fn analytics_schema() -> ColumnarSchema {
238        ColumnarSchema::new(vec![
239            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
240            ColumnDef::required("name", ColumnType::String),
241            ColumnDef::nullable("score", ColumnType::Float64),
242        ])
243        .expect("valid")
244    }
245
246    // ── ResolvedColumnCodec integration tests ─────────────────────────────────
247
248    /// The writer resolves Auto to a concrete codec before writing.
249    /// The resulting footer must not contain codec byte 0 (Auto discriminant).
250    #[test]
251    fn auto_codec_resolves_to_concrete_before_write() {
252        let schema = ColumnarSchema::new(vec![
253            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
254            ColumnDef::required("name", ColumnType::String),
255            ColumnDef::nullable("score", ColumnType::Float64),
256        ])
257        .expect("valid");
258
259        let mut mt = ColumnarMemtable::new(&schema);
260        for i in 0..10 {
261            mt.append_row(&[
262                Value::Integer(i),
263                Value::String(format!("item_{i}")),
264                Value::Float(i as f64 * 1.5),
265            ])
266            .expect("append");
267        }
268        let (schema, columns, row_count) = mt.drain();
269        let writer = SegmentWriter::plain();
270        let segment = writer
271            .write_segment(&schema, &columns, row_count, None)
272            .expect("write must succeed");
273
274        let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
275
276        // None of the column codecs should be the Auto discriminant (byte 0).
277        // All must be concrete, resolved codecs.
278        for col in &footer.columns {
279            // Auto does not exist on ResolvedColumnCodec so the type itself
280            // guarantees this — but we can also serialize and verify the
281            // discriminant byte is not 0.
282            let encoded = zerompk::to_msgpack_vec(&col.codec).expect("serialize");
283            // msgpack c_enum encodes as a single byte when value < 128.
284            // The last byte in the encoded form holds the discriminant.
285            let discriminant_byte = *encoded.last().expect("non-empty");
286            assert_ne!(
287                discriminant_byte, 0,
288                "column '{}' has Auto discriminant (0) on disk — resolve was skipped",
289                col.name
290            );
291        }
292    }
293
294    /// The writer resolves Auto to a sensible non-trivial codec for Int64 columns.
295    #[test]
296    fn auto_codec_int64_resolves_to_non_raw() {
297        use nodedb_codec::ResolvedColumnCodec;
298
299        let schema = ColumnarSchema::new(vec![
300            ColumnDef::required("val", ColumnType::Int64).with_primary_key(),
301        ])
302        .expect("valid");
303
304        let mut mt = ColumnarMemtable::new(&schema);
305        for i in 0..10 {
306            mt.append_row(&[Value::Integer(i)]).expect("append");
307        }
308        let (schema, columns, row_count) = mt.drain();
309        let writer = SegmentWriter::plain();
310        let segment = writer
311            .write_segment(&schema, &columns, row_count, None)
312            .expect("write");
313        let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
314
315        // Auto for Int64 should resolve to a delta/int codec, never Raw or Auto.
316        let codec = footer.columns[0].codec;
317        assert_ne!(
318            codec,
319            ResolvedColumnCodec::Raw,
320            "Int64 should not resolve to Raw"
321        );
322    }
323
324    #[test]
325    fn write_segment_roundtrip() {
326        let schema = analytics_schema();
327        let mut mt = ColumnarMemtable::new(&schema);
328
329        for i in 0..100 {
330            mt.append_row(&[
331                Value::Integer(i),
332                Value::String(format!("user_{i}")),
333                if i % 3 == 0 {
334                    Value::Null
335                } else {
336                    Value::Float(i as f64 * 0.25)
337                },
338            ])
339            .expect("append");
340        }
341
342        let (schema, columns, row_count) = mt.drain();
343        let writer = SegmentWriter::plain();
344        let segment = writer
345            .write_segment(&schema, &columns, row_count, None)
346            .expect("write");
347
348        // Verify header.
349        let header = SegmentHeader::from_bytes(&segment).expect("valid header");
350        assert_eq!(header.magic, *b"NDBS");
351        assert_eq!(header.version_major, 1);
352
353        // Verify footer.
354        let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
355        assert_eq!(footer.column_count, 3);
356        assert_eq!(footer.row_count, 100);
357        assert_eq!(footer.profile_tag, PROFILE_PLAIN);
358        assert_eq!(footer.columns.len(), 3);
359
360        // Verify column metadata.
361        assert_eq!(footer.columns[0].name, "id");
362        assert_eq!(footer.columns[1].name, "name");
363        assert_eq!(footer.columns[2].name, "score");
364
365        // Each column should have 1 block (100 rows < BLOCK_SIZE=1024).
366        assert_eq!(footer.columns[0].block_count, 1);
367        assert_eq!(footer.columns[0].block_stats[0].row_count, 100);
368
369        // id: min=0, max=99.
370        assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
371        assert_eq!(footer.columns[0].block_stats[0].max, 99.0);
372        assert_eq!(footer.columns[0].block_stats[0].null_count, 0);
373
374        // score: 34 nulls (every 3rd row), min=0.25 (row 1), max=99*0.25=24.75 (row 99).
375        assert_eq!(footer.columns[2].block_stats[0].null_count, 34);
376    }
377
378    #[test]
379    fn write_segment_multi_block() {
380        let schema =
381            ColumnarSchema::new(vec![ColumnDef::required("x", ColumnType::Int64)]).expect("valid");
382
383        let mut mt = ColumnarMemtable::new(&schema);
384        for i in 0..2500 {
385            mt.append_row(&[Value::Integer(i)]).expect("append");
386        }
387
388        let (schema, columns, row_count) = mt.drain();
389        let writer = SegmentWriter::plain();
390        let segment = writer
391            .write_segment(&schema, &columns, row_count, None)
392            .expect("write");
393
394        let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
395        assert_eq!(footer.row_count, 2500);
396
397        // 2500 rows / 1024 = 3 blocks (1024 + 1024 + 452).
398        assert_eq!(footer.columns[0].block_count, 3);
399        assert_eq!(footer.columns[0].block_stats[0].row_count, 1024);
400        assert_eq!(footer.columns[0].block_stats[1].row_count, 1024);
401        assert_eq!(footer.columns[0].block_stats[2].row_count, 452);
402
403        // Block 0: min=0, max=1023.
404        assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
405        assert_eq!(footer.columns[0].block_stats[0].max, 1023.0);
406        // Block 2: min=2048, max=2499.
407        assert_eq!(footer.columns[0].block_stats[2].min, 2048.0);
408        assert_eq!(footer.columns[0].block_stats[2].max, 2499.0);
409    }
410
411    #[test]
412    fn write_segment_empty_rejected() {
413        let schema = analytics_schema();
414        let mt = ColumnarMemtable::new(&schema);
415        let (schema, columns, row_count) = {
416            let mut m = mt;
417            m.drain()
418        };
419        let writer = SegmentWriter::plain();
420        assert!(matches!(
421            writer.write_segment(&schema, &columns, row_count, None),
422            Err(ColumnarError::EmptyMemtable)
423        ));
424    }
425
426    #[test]
427    fn block_stats_predicate_pushdown() {
428        let schema = analytics_schema();
429        let mut mt = ColumnarMemtable::new(&schema);
430
431        for i in 0..50 {
432            mt.append_row(&[
433                Value::Integer(i + 100),
434                Value::String(format!("item_{i}")),
435                Value::Float(i as f64 + 10.0),
436            ])
437            .expect("append");
438        }
439
440        let (schema, columns, row_count) = mt.drain();
441        let writer = SegmentWriter::plain();
442        let segment = writer
443            .write_segment(&schema, &columns, row_count, None)
444            .expect("write");
445        let footer = SegmentFooter::from_segment_tail(&segment).expect("valid");
446
447        use crate::predicate::ScanPredicate;
448
449        let id_stats = &footer.columns[0].block_stats[0];
450        // id: min=100, max=149.
451        assert!(ScanPredicate::gt(0, 200.0).can_skip_block(id_stats)); // WHERE id > 200 → skip.
452        assert!(!ScanPredicate::gt(0, 120.0).can_skip_block(id_stats)); // WHERE id > 120 → cannot skip.
453        assert!(ScanPredicate::lt(0, 50.0).can_skip_block(id_stats)); // WHERE id < 50 → skip.
454        assert!(ScanPredicate::eq(0, 200.0).can_skip_block(id_stats)); // WHERE id = 200 → skip.
455        assert!(!ScanPredicate::eq(0, 125.0).can_skip_block(id_stats)); // WHERE id = 125 → cannot skip.
456    }
457
458    #[test]
459    fn string_block_stats_zone_map() {
460        // Write a segment with known string values, then verify str_min/str_max.
461        let schema = ColumnarSchema::new(vec![ColumnDef::required("tag", ColumnType::String)])
462            .expect("valid");
463
464        let mut mt = ColumnarMemtable::new(&schema);
465        // Insert > 16 distinct values to trigger bloom filter construction.
466        // Lexicographic order: apple < banana < cherry < date (first/last matter for zone map).
467        let values: Vec<String> = (0..20).map(|i| format!("item_{i:02}")).collect();
468        for name in &values {
469            mt.append_row(&[Value::String(name.clone())])
470                .expect("append");
471        }
472        // Add known boundary values for zone-map assertions.
473        mt.append_row(&[Value::String("apple".into())])
474            .expect("append");
475        mt.append_row(&[Value::String("date".into())])
476            .expect("append");
477
478        let (schema, columns, row_count) = mt.drain();
479        let writer = SegmentWriter::plain();
480        let segment = writer
481            .write_segment(&schema, &columns, row_count, None)
482            .expect("write");
483        let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
484
485        let stats = &footer.columns[0].block_stats[0];
486        assert!(stats.str_min.is_some(), "str_min should be populated");
487        assert!(stats.str_max.is_some(), "str_max should be populated");
488        // "apple" is lex smallest, "item_19" is lex largest (> "date").
489        assert_eq!(stats.str_min.as_deref(), Some("apple"));
490        assert_eq!(stats.str_max.as_deref(), Some("item_19"));
491
492        // Bloom filter should be present (>16 distinct values).
493        assert!(
494            stats.bloom.is_some(),
495            "bloom should be populated for >16 distinct values"
496        );
497
498        use crate::predicate::ScanPredicate;
499
500        // WHERE tag = "aaa" → below "apple" → skip.
501        assert!(ScanPredicate::str_eq(0, "aaa").can_skip_block(stats));
502        // WHERE tag = "zzz" → above "item_19" → skip.
503        assert!(ScanPredicate::str_eq(0, "zzz").can_skip_block(stats));
504        // WHERE tag = "date" → in range [apple, item_19], inserted in bloom → cannot skip.
505        assert!(!ScanPredicate::str_eq(0, "date").can_skip_block(stats));
506        // WHERE tag > "item_19" → smax ≤ value → skip.
507        assert!(ScanPredicate::str_gt(0, "item_19").can_skip_block(stats));
508        // WHERE tag < "apple" → smin ≥ value → skip.
509        assert!(ScanPredicate::str_lt(0, "apple").can_skip_block(stats));
510    }
511
512    /// timestamps in the year-2300+ microsecond range (far above 2^53)
513    /// must be recorded losslessly in `min_i64`/`max_i64` and predicate
514    /// pushdown must not false-skip a block that contains the target value.
515    #[test]
516    fn timestamp_large_value_roundtrip() {
517        use crate::predicate::ScanPredicate;
518
519        let schema = ColumnarSchema::new(vec![
520            ColumnDef::required("ts", ColumnType::Timestamp).with_primary_key(),
521        ])
522        .expect("valid schema");
523
524        // Year-2300 in microseconds since epoch.
525        // 2300-01-01T00:00:00Z ≈ 10_413_792_000_000_000 µs
526        // These values are well above 2^53 = 9_007_199_254_740_992.
527        let base: i64 = 10_413_792_000_000_000;
528        let target = base + 500_000; // half a second later
529
530        let mut mt = ColumnarMemtable::new(&schema);
531        for delta in 0..10i64 {
532            mt.append_row(&[Value::Integer(base + delta * 100_000)])
533                .expect("append");
534        }
535
536        let (schema, columns, row_count) = mt.drain();
537        let segment = SegmentWriter::plain()
538            .write_segment(&schema, &columns, row_count, None)
539            .expect("write");
540        let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
541
542        let stats = &footer.columns[0].block_stats[0];
543
544        // Exact i64 fields must be populated.
545        assert!(
546            stats.min_i64.is_some(),
547            "min_i64 must be set for timestamp columns"
548        );
549        assert!(
550            stats.max_i64.is_some(),
551            "max_i64 must be set for timestamp columns"
552        );
553        assert_eq!(stats.min_i64.unwrap(), base);
554        assert_eq!(stats.max_i64.unwrap(), base + 9 * 100_000);
555
556        // Predicate: ts = target (base + 500_000) → in [base, base+900_000] → must NOT skip.
557        assert!(
558            !ScanPredicate::eq_i64(0, target).can_skip_block(stats),
559            "must not skip: target={target} is within the block range"
560        );
561
562        // Predicate: ts = base - 1 → below min → must skip.
563        assert!(
564            ScanPredicate::eq_i64(0, base - 1).can_skip_block(stats),
565            "must skip: base-1 is below block min"
566        );
567
568        // The f64 path is broken for these values (min, max, target all round
569        // to the same f64 or nearby indistinguishable values).
570        let min_f64 = base as f64;
571        let target_f64 = target as f64;
572        let max_f64 = (base + 9 * 100_000) as f64;
573        // Verify the f64 representation is unreliable for this range.
574        // (min_f64 == target_f64 if the gap < ULP, which it is at this scale.)
575        let _ = (min_f64, target_f64, max_f64); // suppress unused warnings
576    }
577
578    #[test]
579    fn integer_block_stats_have_exact_i64_fields() {
580        // Verify that Int64 columns also populate min_i64/max_i64.
581        let schema = ColumnarSchema::new(vec![
582            ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
583        ])
584        .expect("valid");
585
586        let mut mt = ColumnarMemtable::new(&schema);
587        for i in 0..5i64 {
588            mt.append_row(&[Value::Integer(i64::MAX - 4 + i)])
589                .expect("append");
590        }
591
592        let (schema, columns, row_count) = mt.drain();
593        let segment = SegmentWriter::plain()
594            .write_segment(&schema, &columns, row_count, None)
595            .expect("write");
596        let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
597
598        let stats = &footer.columns[0].block_stats[0];
599        assert_eq!(stats.min_i64, Some(i64::MAX - 4));
600        assert_eq!(stats.max_i64, Some(i64::MAX));
601
602        // eq_i64 must not skip for a value in the middle.
603        use crate::predicate::ScanPredicate;
604        assert!(!ScanPredicate::eq_i64(0, i64::MAX - 2).can_skip_block(stats));
605        // eq_i64 must skip for a value below min.
606        assert!(ScanPredicate::eq_i64(0, i64::MAX - 10).can_skip_block(stats));
607    }
608
609    #[test]
610    fn string_block_stats_bloom_rejects_absent_value() {
611        let schema = ColumnarSchema::new(vec![ColumnDef::required("label", ColumnType::String)])
612            .expect("valid");
613
614        let mut mt = ColumnarMemtable::new(&schema);
615        // Insert > 16 distinct values to trigger bloom construction.
616        let values: Vec<String> = (0..20).map(|i| format!("val_{i:02}")).collect();
617        for name in &values {
618            mt.append_row(&[Value::String(name.clone())])
619                .expect("append");
620        }
621        // Add known values for bloom assertions.
622        mt.append_row(&[Value::String("alpha".into())])
623            .expect("append");
624        mt.append_row(&[Value::String("beta".into())])
625            .expect("append");
626        mt.append_row(&[Value::String("gamma".into())])
627            .expect("append");
628
629        let (schema, columns, row_count) = mt.drain();
630        let segment = SegmentWriter::plain()
631            .write_segment(&schema, &columns, row_count, None)
632            .expect("write");
633        let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
634        let stats = &footer.columns[0].block_stats[0];
635
636        use crate::predicate::{ScanPredicate, bloom_may_contain};
637
638        let bloom = stats
639            .bloom
640            .as_ref()
641            .expect("bloom present for >16 distinct");
642        assert!(bloom_may_contain(bloom, "alpha"));
643        assert!(bloom_may_contain(bloom, "beta"));
644        assert!(bloom_may_contain(bloom, "gamma"));
645
646        // "delta" was not inserted. If bloom says absent, the predicate skips.
647        let delta_absent = !bloom_may_contain(bloom, "delta");
648        if delta_absent {
649            // "delta" is in [alpha, val_19] range → only bloom can skip this.
650            assert!(ScanPredicate::str_eq(0, "delta").can_skip_block(stats));
651        }
652    }
653}