Skip to main content

nodedb_codec/
fsst.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! FSST (Fast Static Symbol Table) codec for string/log columns.
4//!
5//! Builds a lightweight dictionary of common substrings (1-8 bytes) and
6//! encodes strings as sequences of symbol table indices. Unlike whole-string
7//! dictionary encoding, FSST handles partial overlap — strings sharing
8//! prefixes or suffixes compress well even if no two strings are identical.
9//!
10//! Compression: 3-5x on string columns before any terminal compressor.
11//! Combined with lz4_flex terminal: 8-15x total on structured log data.
12//!
13//! Decompression: simple table lookup — fast enough to query directly
14//! over encoded data.
15//!
16//! Wire format:
17//! ```text
18//! [2 bytes] symbol count (LE u16, max 255)
19//! [symbol_count × (1 + len) bytes] symbol table: (len: u8, bytes: [u8; len])
20//! [4 bytes] total encoded length (LE u32)
21//! [4 bytes] string count (LE u32)
22//! [string_count × 4 bytes] encoded string offsets (cumulative LE u32)
23//! [N bytes] encoded data (symbol indices interleaved with escape+literal)
24//! ```
25//!
26//! Escape mechanism: byte value 255 followed by a literal byte encodes
27//! bytes not covered by any symbol. Symbol indices are 0..254.
28
29use crate::error::CodecError;
30
31/// Maximum number of symbols in the table (reserve 255 as escape).
32const MAX_SYMBOLS: usize = 255;
33
34/// Maximum symbol length in bytes.
35const MAX_SYMBOL_LEN: usize = 8;
36
37/// Escape byte: signals the next byte is a literal (not a symbol index).
38const ESCAPE: u8 = 255;
39
40/// Number of training passes over the input data.
41const TRAINING_ROUNDS: usize = 5;
42
43// ---------------------------------------------------------------------------
44// Symbol table
45// ---------------------------------------------------------------------------
46
47/// A trained FSST symbol table.
48#[derive(Debug, Clone)]
49struct SymbolTable {
50    /// Symbols sorted by length (longest first) for greedy matching.
51    symbols: Vec<Vec<u8>>,
52}
53
54impl SymbolTable {
55    /// Train a symbol table from a set of input strings.
56    ///
57    /// Uses iterative count-based selection: in each round, count how many
58    /// bytes each candidate n-gram would save, pick the best, repeat.
59    fn train(strings: &[&[u8]]) -> Self {
60        if strings.is_empty() {
61            return Self {
62                symbols: Vec::new(),
63            };
64        }
65
66        let mut symbols: Vec<Vec<u8>> = Vec::new();
67        let mut symbol_set: std::collections::HashSet<Vec<u8>> = std::collections::HashSet::new();
68        let mut candidates: std::collections::HashMap<Vec<u8>, usize> =
69            std::collections::HashMap::new();
70
71        for _round in 0..TRAINING_ROUNDS {
72            // Count n-gram frequencies in the data (after encoding with current table).
73            candidates.clear();
74
75            for s in strings {
76                // Scan for n-grams of length 1-8 that are NOT already covered by symbols.
77                let mut pos = 0;
78                while pos < s.len() {
79                    // Check if current position starts with a known symbol.
80                    let existing_match = longest_symbol_match(&symbols, s, pos);
81
82                    if existing_match > 0 {
83                        pos += existing_match;
84                        continue;
85                    }
86
87                    // No existing symbol matches — count new n-gram candidates.
88                    for len in 1..=MAX_SYMBOL_LEN.min(s.len() - pos) {
89                        let ngram = &s[pos..pos + len];
90                        *candidates.entry(ngram.to_vec()).or_insert(0) += 1;
91                    }
92                    pos += 1;
93                }
94            }
95
96            if candidates.is_empty() {
97                break;
98            }
99
100            // Score candidates by compression gain: frequency * (length - 1).
101            // Each symbol saves (length - 1) bytes per occurrence (1 byte for
102            // the symbol index vs `length` bytes raw).
103            let mut scored: Vec<(Vec<u8>, usize)> = candidates
104                .drain()
105                .map(|(ngram, freq)| {
106                    let gain = freq * (ngram.len().saturating_sub(1));
107                    (ngram, gain)
108                })
109                .filter(|(_, gain)| *gain > 0)
110                .collect();
111
112            scored.sort_by_key(|a| std::cmp::Reverse(a.1));
113
114            // Add top candidates that don't duplicate existing symbols.
115            for (ngram, _) in scored {
116                if symbols.len() >= MAX_SYMBOLS {
117                    break;
118                }
119                if symbol_set.insert(ngram.clone()) {
120                    symbols.push(ngram);
121                }
122            }
123        }
124
125        // Sort symbols longest-first for greedy matching.
126        symbols.sort_by_key(|a| std::cmp::Reverse(a.len()));
127
128        Self { symbols }
129    }
130
131    fn symbol_count(&self) -> usize {
132        self.symbols.len()
133    }
134}
135
136/// Find the longest symbol matching at position `pos` in `data`.
137/// Returns the match length (0 if no match).
138fn longest_symbol_match(symbols: &[Vec<u8>], data: &[u8], pos: usize) -> usize {
139    let remaining = &data[pos..];
140    for sym in symbols {
141        if remaining.starts_with(sym) {
142            return sym.len();
143        }
144    }
145    0
146}
147
148// ---------------------------------------------------------------------------
149// Public encode / decode API
150// ---------------------------------------------------------------------------
151
152/// Encode a batch of strings using FSST compression.
153///
154/// Trains a symbol table on the input, then encodes each string as a
155/// sequence of symbol indices and escaped literals.
156pub fn encode(strings: &[&[u8]]) -> Vec<u8> {
157    let table = SymbolTable::train(strings);
158
159    // Encode each string.
160    let mut encoded_strings: Vec<Vec<u8>> = Vec::with_capacity(strings.len());
161    for s in strings {
162        encoded_strings.push(encode_string(&table, s));
163    }
164
165    // Build wire format.
166    let mut out = Vec::new();
167
168    // Symbol table.
169    out.extend_from_slice(&(table.symbol_count() as u16).to_le_bytes());
170    for sym in &table.symbols {
171        out.push(sym.len() as u8);
172        out.extend_from_slice(sym);
173    }
174
175    // Encoded strings with offset table.
176    let total_encoded: usize = encoded_strings.iter().map(|s| s.len()).sum();
177    out.extend_from_slice(&(total_encoded as u32).to_le_bytes());
178    out.extend_from_slice(&(strings.len() as u32).to_le_bytes());
179
180    // Cumulative offsets.
181    let mut offset = 0u32;
182    for es in &encoded_strings {
183        offset += es.len() as u32;
184        out.extend_from_slice(&offset.to_le_bytes());
185    }
186
187    // Encoded data.
188    for es in &encoded_strings {
189        out.extend_from_slice(es);
190    }
191
192    out
193}
194
195/// Decode FSST-compressed data back to strings.
196pub fn decode(data: &[u8]) -> Result<Vec<Vec<u8>>, CodecError> {
197    if data.len() < 2 {
198        return Err(CodecError::Truncated {
199            expected: 2,
200            actual: data.len(),
201        });
202    }
203
204    // Read symbol table.
205    let sym_count = u16::from_le_bytes([data[0], data[1]]) as usize;
206    let mut pos = 2;
207    let mut symbols: Vec<Vec<u8>> = Vec::with_capacity(sym_count);
208
209    for _ in 0..sym_count {
210        if pos >= data.len() {
211            return Err(CodecError::Truncated {
212                expected: pos + 1,
213                actual: data.len(),
214            });
215        }
216        let len = data[pos] as usize;
217        pos += 1;
218        if pos + len > data.len() {
219            return Err(CodecError::Truncated {
220                expected: pos + len,
221                actual: data.len(),
222            });
223        }
224        symbols.push(data[pos..pos + len].to_vec());
225        pos += len;
226    }
227
228    // Read header.
229    if pos + 8 > data.len() {
230        return Err(CodecError::Truncated {
231            expected: pos + 8,
232            actual: data.len(),
233        });
234    }
235    let _total_encoded =
236        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
237    pos += 4;
238    let string_count =
239        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
240    pos += 4;
241
242    // Read offsets.
243    let offsets_size = string_count * 4;
244    if pos + offsets_size > data.len() {
245        return Err(CodecError::Truncated {
246            expected: pos + offsets_size,
247            actual: data.len(),
248        });
249    }
250    let mut offsets = Vec::with_capacity(string_count);
251    for i in 0..string_count {
252        let off_pos = pos + i * 4;
253        offsets.push(u32::from_le_bytes([
254            data[off_pos],
255            data[off_pos + 1],
256            data[off_pos + 2],
257            data[off_pos + 3],
258        ]) as usize);
259    }
260    pos += offsets_size;
261
262    let encoded_data = &data[pos..];
263
264    // Decode each string.
265    let mut result = Vec::with_capacity(string_count);
266    let mut prev_end = 0;
267    for &end in &offsets {
268        if end > encoded_data.len() {
269            return Err(CodecError::Truncated {
270                expected: pos + end,
271                actual: data.len(),
272            });
273        }
274        let encoded_str = &encoded_data[prev_end..end];
275        result.push(decode_string(&symbols, encoded_str)?);
276        prev_end = end;
277    }
278
279    Ok(result)
280}
281
282/// Convenience: encode a single contiguous byte buffer that contains
283/// multiple strings separated by a delimiter (e.g., newlines for log data).
284pub fn encode_delimited(data: &[u8], delimiter: u8) -> Vec<u8> {
285    let strings: Vec<&[u8]> = data.split(|&b| b == delimiter).collect();
286    encode(&strings)
287}
288
289/// Convenience: decode and reassemble with delimiter.
290pub fn decode_delimited(data: &[u8], delimiter: u8) -> Result<Vec<u8>, CodecError> {
291    let strings = decode(data)?;
292    let mut out = Vec::new();
293    for (i, s) in strings.iter().enumerate() {
294        if i > 0 {
295            out.push(delimiter);
296        }
297        out.extend_from_slice(s);
298    }
299    Ok(out)
300}
301
302// ---------------------------------------------------------------------------
303// Per-string encode / decode
304// ---------------------------------------------------------------------------
305
306fn encode_string(table: &SymbolTable, input: &[u8]) -> Vec<u8> {
307    let mut out = Vec::with_capacity(input.len());
308    let mut pos = 0;
309
310    while pos < input.len() {
311        // Greedy: try to match the longest symbol at current position.
312        let mut matched = false;
313        for (idx, sym) in table.symbols.iter().enumerate() {
314            if input[pos..].starts_with(sym) {
315                out.push(idx as u8);
316                pos += sym.len();
317                matched = true;
318                break;
319            }
320        }
321
322        if !matched {
323            // No symbol matches — emit escape + literal byte.
324            out.push(ESCAPE);
325            out.push(input[pos]);
326            pos += 1;
327        }
328    }
329
330    out
331}
332
333fn decode_string(symbols: &[Vec<u8>], encoded: &[u8]) -> Result<Vec<u8>, CodecError> {
334    let mut out = Vec::with_capacity(encoded.len() * 2);
335    let mut pos = 0;
336
337    while pos < encoded.len() {
338        let byte = encoded[pos];
339        pos += 1;
340
341        if byte == ESCAPE {
342            // Next byte is a literal.
343            if pos >= encoded.len() {
344                return Err(CodecError::Corrupt {
345                    detail: "FSST escape at end of encoded data".into(),
346                });
347            }
348            out.push(encoded[pos]);
349            pos += 1;
350        } else {
351            // Symbol index.
352            let idx = byte as usize;
353            if idx >= symbols.len() {
354                return Err(CodecError::Corrupt {
355                    detail: format!(
356                        "FSST symbol index {idx} out of range (max {})",
357                        symbols.len()
358                    ),
359                });
360            }
361            out.extend_from_slice(&symbols[idx]);
362        }
363    }
364
365    Ok(out)
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[test]
373    fn empty_input() {
374        let encoded = encode(&[]);
375        let decoded = decode(&encoded).unwrap();
376        assert!(decoded.is_empty());
377    }
378
379    #[test]
380    fn single_string() {
381        let strings: Vec<&[u8]> = vec![b"hello world"];
382        let encoded = encode(&strings);
383        let decoded = decode(&encoded).unwrap();
384        assert_eq!(decoded.len(), 1);
385        assert_eq!(decoded[0], b"hello world");
386    }
387
388    #[test]
389    fn multiple_strings_roundtrip() {
390        let strings: Vec<&[u8]> = vec![
391            b"us-east-1",
392            b"us-east-2",
393            b"us-west-1",
394            b"eu-west-1",
395            b"us-east-1",
396            b"us-east-1",
397        ];
398        let encoded = encode(&strings);
399        let decoded = decode(&encoded).unwrap();
400        assert_eq!(decoded.len(), strings.len());
401        for (a, b) in strings.iter().zip(decoded.iter()) {
402            assert_eq!(*a, b.as_slice());
403        }
404    }
405
406    #[test]
407    fn repetitive_log_lines() {
408        let lines: Vec<&[u8]> = (0..1000)
409            .map(|i| {
410                let s: &[u8] = match i % 5 {
411                    0 => b"2024-01-15 INFO server.handler request_id=abc method=GET status=200",
412                    1 => b"2024-01-15 INFO server.handler request_id=def method=POST status=201",
413                    2 => b"2024-01-15 WARN server.handler request_id=ghi method=GET status=404",
414                    3 => b"2024-01-15 ERROR server.handler request_id=jkl method=PUT status=500",
415                    _ => b"2024-01-15 DEBUG server.handler request_id=mno method=GET status=200",
416                };
417                s
418            })
419            .collect();
420
421        let encoded = encode(&lines);
422        let decoded = decode(&encoded).unwrap();
423        assert_eq!(decoded.len(), lines.len());
424        for (a, b) in lines.iter().zip(decoded.iter()) {
425            assert_eq!(*a, b.as_slice());
426        }
427
428        // FSST should compress repetitive logs.
429        let raw_size: usize = lines.iter().map(|s| s.len()).sum();
430        let ratio = raw_size as f64 / encoded.len() as f64;
431        assert!(
432            ratio > 1.5,
433            "FSST should compress repetitive logs >1.5x, got {ratio:.1}x"
434        );
435    }
436
437    #[test]
438    fn hostnames() {
439        let hosts: Vec<&[u8]> = vec![
440            b"prod-web-01.us-east-1.example.com",
441            b"prod-web-02.us-east-1.example.com",
442            b"prod-web-03.us-east-1.example.com",
443            b"prod-api-01.us-west-2.example.com",
444            b"prod-api-02.us-west-2.example.com",
445            b"staging-web-01.eu-west-1.example.com",
446        ];
447        let encoded = encode(&hosts);
448        let decoded = decode(&encoded).unwrap();
449        for (a, b) in hosts.iter().zip(decoded.iter()) {
450            assert_eq!(*a, b.as_slice());
451        }
452    }
453
454    #[test]
455    fn binary_data() {
456        // Binary data with no patterns — should still roundtrip (escape every byte).
457        let data: Vec<&[u8]> = vec![&[0, 1, 2, 3, 4, 255, 254, 253]];
458        let encoded = encode(&data);
459        let decoded = decode(&encoded).unwrap();
460        assert_eq!(decoded[0], data[0]);
461    }
462
463    #[test]
464    fn empty_strings() {
465        let strings: Vec<&[u8]> = vec![b"", b"hello", b"", b"world", b""];
466        let encoded = encode(&strings);
467        let decoded = decode(&encoded).unwrap();
468        assert_eq!(decoded.len(), 5);
469        assert!(decoded[0].is_empty());
470        assert_eq!(decoded[1], b"hello");
471        assert!(decoded[2].is_empty());
472    }
473
474    #[test]
475    fn delimited_roundtrip() {
476        let data = b"line one\nline two\nline three\nline one\nline two";
477        let encoded = encode_delimited(data, b'\n');
478        let decoded = decode_delimited(&encoded, b'\n').unwrap();
479        assert_eq!(decoded, data);
480    }
481
482    #[test]
483    fn compression_ratio_structured_logs() {
484        let mut lines: Vec<Vec<u8>> = Vec::new();
485        for i in 0..5000 {
486            let line = format!(
487                "2024-01-15T10:30:{:02}.000Z INFO server.handler request_id={} method=GET path=/api/v1/metrics status=200 duration_ms={}",
488                i % 60,
489                10000 + i,
490                i * 3 + 1
491            );
492            lines.push(line.into_bytes());
493        }
494        let refs: Vec<&[u8]> = lines.iter().map(|l| l.as_slice()).collect();
495
496        let encoded = encode(&refs);
497        let decoded = decode(&encoded).unwrap();
498        assert_eq!(decoded.len(), lines.len());
499
500        let raw_size: usize = lines.iter().map(|s| s.len()).sum();
501        let ratio = raw_size as f64 / encoded.len() as f64;
502        assert!(
503            ratio > 1.5,
504            "FSST should compress structured logs >1.5x, got {ratio:.1}x"
505        );
506    }
507
508    #[test]
509    fn truncated_input_errors() {
510        assert!(decode(&[]).is_err());
511        assert!(decode(&[1]).is_err());
512    }
513
514    #[test]
515    fn large_dataset() {
516        let mut strings: Vec<Vec<u8>> = Vec::new();
517        for i in 0..10_000 {
518            strings.push(format!("key-{}-value-{}", i % 100, i % 50).into_bytes());
519        }
520        let refs: Vec<&[u8]> = strings.iter().map(|s| s.as_slice()).collect();
521        let encoded = encode(&refs);
522        let decoded = decode(&encoded).unwrap();
523        assert_eq!(decoded.len(), strings.len());
524        for (a, b) in strings.iter().zip(decoded.iter()) {
525            assert_eq!(a.as_slice(), b.as_slice());
526        }
527    }
528}