1mod block;
8mod encode;
9mod stats;
10
11use nodedb_codec::{ColumnCodec, ColumnTypeHint};
12use nodedb_types::columnar::{ColumnType, ColumnarSchema};
13
14use crate::error::ColumnarError;
15use crate::format::{ColumnMeta, HEADER_SIZE, SegmentFooter, SegmentHeader};
16use crate::memtable::ColumnData;
17
18use self::block::encode_column_blocks;
19use self::encode::compute_schema_hash;
20
21pub const PROFILE_PLAIN: u8 = 0;
23pub const PROFILE_TIMESERIES: u8 = 1;
24pub const PROFILE_SPATIAL: u8 = 2;
25
26pub struct SegmentWriter {
32 profile_tag: u8,
33}
34
35impl SegmentWriter {
36 pub fn new(profile_tag: u8) -> Self {
38 Self { profile_tag }
39 }
40
41 pub fn plain() -> Self {
43 Self::new(PROFILE_PLAIN)
44 }
45
46 pub fn write_segment(
51 &self,
52 schema: &ColumnarSchema,
53 columns: &[ColumnData],
54 row_count: usize,
55 ) -> Result<Vec<u8>, ColumnarError> {
56 if row_count == 0 {
57 return Err(ColumnarError::EmptyMemtable);
58 }
59 if columns.len() != schema.columns.len() {
60 return Err(ColumnarError::SchemaMismatch {
61 expected: schema.columns.len(),
62 got: columns.len(),
63 });
64 }
65
66 let mut buf = Vec::new();
67
68 buf.extend_from_slice(&SegmentHeader::current().to_bytes());
70
71 let mut column_metas = Vec::with_capacity(columns.len());
73
74 for (i, (col_def, col_data)) in schema.columns.iter().zip(columns.iter()).enumerate() {
75 let col_start = buf.len() as u64;
76
77 let codec = select_codec_for_profile(&col_def.column_type, self.profile_tag);
79
80 let block_stats =
82 encode_column_blocks(&mut buf, col_data, &col_def.column_type, codec, row_count)?;
83
84 let col_end = buf.len() as u64;
85
86 let (effective_codec, dictionary) = match col_data {
89 ColumnData::DictEncoded { dictionary, .. } => {
90 (ColumnCodec::DeltaFastLanesLz4, Some(dictionary.clone()))
91 }
92 _ => (codec, None),
93 };
94
95 column_metas.push(ColumnMeta {
96 name: col_def.name.clone(),
97 offset: col_start - HEADER_SIZE as u64,
98 length: col_end - col_start,
99 codec: effective_codec,
100 block_count: block_stats.len() as u32,
101 block_stats,
102 dictionary,
103 });
104
105 let _ = i; }
107
108 let schema_hash = compute_schema_hash(schema);
110
111 let footer = SegmentFooter {
113 schema_hash,
114 column_count: schema.columns.len() as u32,
115 row_count: row_count as u64,
116 profile_tag: self.profile_tag,
117 columns: column_metas,
118 };
119 let footer_bytes = footer.to_bytes()?;
120 buf.extend_from_slice(&footer_bytes);
121
122 Ok(buf)
123 }
124}
125
126pub fn select_codec_for_profile(col_type: &ColumnType, profile_tag: u8) -> ColumnCodec {
132 if profile_tag == PROFILE_TIMESERIES && matches!(col_type, ColumnType::Float64) {
134 return ColumnCodec::Gorilla;
135 }
136 if profile_tag == PROFILE_TIMESERIES && matches!(col_type, ColumnType::Timestamp) {
138 return ColumnCodec::DeltaFastLanesLz4;
139 }
140 select_codec(col_type)
141}
142
143fn select_codec(col_type: &ColumnType) -> ColumnCodec {
145 let hint = match col_type {
146 ColumnType::Int64 => ColumnTypeHint::Int64,
147 ColumnType::Float64 => ColumnTypeHint::Float64,
148 ColumnType::Timestamp => ColumnTypeHint::Timestamp,
149 ColumnType::String | ColumnType::Geometry | ColumnType::Regex => ColumnTypeHint::String,
150 ColumnType::Bool
151 | ColumnType::Bytes
152 | ColumnType::Decimal
153 | ColumnType::Uuid
154 | ColumnType::Ulid
155 | ColumnType::Json
156 | ColumnType::Array
157 | ColumnType::Set
158 | ColumnType::Range
159 | ColumnType::Record => {
160 return ColumnCodec::Lz4;
161 }
162 ColumnType::Duration => ColumnTypeHint::Int64, ColumnType::Vector(_) => {
164 return ColumnCodec::Lz4;
165 }
166 };
167 nodedb_codec::detect_codec(ColumnCodec::Auto, hint)
168}
169
170#[cfg(test)]
171mod tests {
172 use nodedb_types::columnar::{ColumnDef, ColumnType, ColumnarSchema};
173 use nodedb_types::value::Value;
174
175 use super::*;
176 use crate::format::{SegmentFooter, SegmentHeader};
177 use crate::memtable::ColumnarMemtable;
178
179 fn analytics_schema() -> ColumnarSchema {
180 ColumnarSchema::new(vec![
181 ColumnDef::required("id", ColumnType::Int64).with_primary_key(),
182 ColumnDef::required("name", ColumnType::String),
183 ColumnDef::nullable("score", ColumnType::Float64),
184 ])
185 .expect("valid")
186 }
187
188 #[test]
189 fn write_segment_roundtrip() {
190 let schema = analytics_schema();
191 let mut mt = ColumnarMemtable::new(&schema);
192
193 for i in 0..100 {
194 mt.append_row(&[
195 Value::Integer(i),
196 Value::String(format!("user_{i}")),
197 if i % 3 == 0 {
198 Value::Null
199 } else {
200 Value::Float(i as f64 * 0.25)
201 },
202 ])
203 .expect("append");
204 }
205
206 let (schema, columns, row_count) = mt.drain();
207 let writer = SegmentWriter::plain();
208 let segment = writer
209 .write_segment(&schema, &columns, row_count)
210 .expect("write");
211
212 let header = SegmentHeader::from_bytes(&segment).expect("valid header");
214 assert_eq!(header.magic, *b"NDBS");
215 assert_eq!(header.version_major, 1);
216
217 let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
219 assert_eq!(footer.column_count, 3);
220 assert_eq!(footer.row_count, 100);
221 assert_eq!(footer.profile_tag, PROFILE_PLAIN);
222 assert_eq!(footer.columns.len(), 3);
223
224 assert_eq!(footer.columns[0].name, "id");
226 assert_eq!(footer.columns[1].name, "name");
227 assert_eq!(footer.columns[2].name, "score");
228
229 assert_eq!(footer.columns[0].block_count, 1);
231 assert_eq!(footer.columns[0].block_stats[0].row_count, 100);
232
233 assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
235 assert_eq!(footer.columns[0].block_stats[0].max, 99.0);
236 assert_eq!(footer.columns[0].block_stats[0].null_count, 0);
237
238 assert_eq!(footer.columns[2].block_stats[0].null_count, 34);
240 }
241
242 #[test]
243 fn write_segment_multi_block() {
244 let schema =
245 ColumnarSchema::new(vec![ColumnDef::required("x", ColumnType::Int64)]).expect("valid");
246
247 let mut mt = ColumnarMemtable::new(&schema);
248 for i in 0..2500 {
249 mt.append_row(&[Value::Integer(i)]).expect("append");
250 }
251
252 let (schema, columns, row_count) = mt.drain();
253 let writer = SegmentWriter::plain();
254 let segment = writer
255 .write_segment(&schema, &columns, row_count)
256 .expect("write");
257
258 let footer = SegmentFooter::from_segment_tail(&segment).expect("valid footer");
259 assert_eq!(footer.row_count, 2500);
260
261 assert_eq!(footer.columns[0].block_count, 3);
263 assert_eq!(footer.columns[0].block_stats[0].row_count, 1024);
264 assert_eq!(footer.columns[0].block_stats[1].row_count, 1024);
265 assert_eq!(footer.columns[0].block_stats[2].row_count, 452);
266
267 assert_eq!(footer.columns[0].block_stats[0].min, 0.0);
269 assert_eq!(footer.columns[0].block_stats[0].max, 1023.0);
270 assert_eq!(footer.columns[0].block_stats[2].min, 2048.0);
272 assert_eq!(footer.columns[0].block_stats[2].max, 2499.0);
273 }
274
275 #[test]
276 fn write_segment_empty_rejected() {
277 let schema = analytics_schema();
278 let mt = ColumnarMemtable::new(&schema);
279 let (schema, columns, row_count) = {
280 let mut m = mt;
281 m.drain()
282 };
283 let writer = SegmentWriter::plain();
284 assert!(matches!(
285 writer.write_segment(&schema, &columns, row_count),
286 Err(ColumnarError::EmptyMemtable)
287 ));
288 }
289
290 #[test]
291 fn block_stats_predicate_pushdown() {
292 let schema = analytics_schema();
293 let mut mt = ColumnarMemtable::new(&schema);
294
295 for i in 0..50 {
296 mt.append_row(&[
297 Value::Integer(i + 100),
298 Value::String(format!("item_{i}")),
299 Value::Float(i as f64 + 10.0),
300 ])
301 .expect("append");
302 }
303
304 let (schema, columns, row_count) = mt.drain();
305 let writer = SegmentWriter::plain();
306 let segment = writer
307 .write_segment(&schema, &columns, row_count)
308 .expect("write");
309 let footer = SegmentFooter::from_segment_tail(&segment).expect("valid");
310
311 use crate::predicate::ScanPredicate;
312
313 let id_stats = &footer.columns[0].block_stats[0];
314 assert!(ScanPredicate::gt(0, 200.0).can_skip_block(id_stats)); assert!(!ScanPredicate::gt(0, 120.0).can_skip_block(id_stats)); assert!(ScanPredicate::lt(0, 50.0).can_skip_block(id_stats)); assert!(ScanPredicate::eq(0, 200.0).can_skip_block(id_stats)); assert!(!ScanPredicate::eq(0, 125.0).can_skip_block(id_stats)); }
321
322 #[test]
323 fn string_block_stats_zone_map() {
324 let schema = ColumnarSchema::new(vec![ColumnDef::required("tag", ColumnType::String)])
326 .expect("valid");
327
328 let mut mt = ColumnarMemtable::new(&schema);
329 let values: Vec<String> = (0..20).map(|i| format!("item_{i:02}")).collect();
332 for name in &values {
333 mt.append_row(&[Value::String(name.clone())])
334 .expect("append");
335 }
336 mt.append_row(&[Value::String("apple".into())])
338 .expect("append");
339 mt.append_row(&[Value::String("date".into())])
340 .expect("append");
341
342 let (schema, columns, row_count) = mt.drain();
343 let writer = SegmentWriter::plain();
344 let segment = writer
345 .write_segment(&schema, &columns, row_count)
346 .expect("write");
347 let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
348
349 let stats = &footer.columns[0].block_stats[0];
350 assert!(stats.str_min.is_some(), "str_min should be populated");
351 assert!(stats.str_max.is_some(), "str_max should be populated");
352 assert_eq!(stats.str_min.as_deref(), Some("apple"));
354 assert_eq!(stats.str_max.as_deref(), Some("item_19"));
355
356 assert!(
358 stats.bloom.is_some(),
359 "bloom should be populated for >16 distinct values"
360 );
361
362 use crate::predicate::ScanPredicate;
363
364 assert!(ScanPredicate::str_eq(0, "aaa").can_skip_block(stats));
366 assert!(ScanPredicate::str_eq(0, "zzz").can_skip_block(stats));
368 assert!(!ScanPredicate::str_eq(0, "date").can_skip_block(stats));
370 assert!(ScanPredicate::str_gt(0, "item_19").can_skip_block(stats));
372 assert!(ScanPredicate::str_lt(0, "apple").can_skip_block(stats));
374 }
375
376 #[test]
377 fn string_block_stats_bloom_rejects_absent_value() {
378 let schema = ColumnarSchema::new(vec![ColumnDef::required("label", ColumnType::String)])
379 .expect("valid");
380
381 let mut mt = ColumnarMemtable::new(&schema);
382 let values: Vec<String> = (0..20).map(|i| format!("val_{i:02}")).collect();
384 for name in &values {
385 mt.append_row(&[Value::String(name.clone())])
386 .expect("append");
387 }
388 mt.append_row(&[Value::String("alpha".into())])
390 .expect("append");
391 mt.append_row(&[Value::String("beta".into())])
392 .expect("append");
393 mt.append_row(&[Value::String("gamma".into())])
394 .expect("append");
395
396 let (schema, columns, row_count) = mt.drain();
397 let segment = SegmentWriter::plain()
398 .write_segment(&schema, &columns, row_count)
399 .expect("write");
400 let footer = SegmentFooter::from_segment_tail(&segment).expect("footer");
401 let stats = &footer.columns[0].block_stats[0];
402
403 use crate::predicate::{ScanPredicate, bloom_may_contain};
404
405 let bloom = stats
406 .bloom
407 .as_deref()
408 .expect("bloom present for >16 distinct");
409 assert!(bloom_may_contain(bloom, "alpha"));
410 assert!(bloom_may_contain(bloom, "beta"));
411 assert!(bloom_may_contain(bloom, "gamma"));
412
413 let delta_absent = !bloom_may_contain(bloom, "delta");
415 if delta_absent {
416 assert!(ScanPredicate::str_eq(0, "delta").can_skip_block(stats));
418 }
419 }
420}