Skip to main content

nodedb_codec/
fsst.rs

1//! FSST (Fast Static Symbol Table) codec for string/log columns.
2//!
3//! Builds a lightweight dictionary of common substrings (1-8 bytes) and
4//! encodes strings as sequences of symbol table indices. Unlike whole-string
5//! dictionary encoding, FSST handles partial overlap — strings sharing
6//! prefixes or suffixes compress well even if no two strings are identical.
7//!
8//! Compression: 3-5x on string columns before any terminal compressor.
9//! Combined with lz4_flex terminal: 8-15x total on structured log data.
10//!
11//! Decompression: simple table lookup — fast enough to query directly
12//! over encoded data.
13//!
14//! Wire format:
15//! ```text
16//! [2 bytes] symbol count (LE u16, max 255)
17//! [symbol_count × (1 + len) bytes] symbol table: (len: u8, bytes: [u8; len])
18//! [4 bytes] total encoded length (LE u32)
19//! [4 bytes] string count (LE u32)
20//! [string_count × 4 bytes] encoded string offsets (cumulative LE u32)
21//! [N bytes] encoded data (symbol indices interleaved with escape+literal)
22//! ```
23//!
24//! Escape mechanism: byte value 255 followed by a literal byte encodes
25//! bytes not covered by any symbol. Symbol indices are 0..254.
26
27use crate::error::CodecError;
28
29/// Maximum number of symbols in the table (reserve 255 as escape).
30const MAX_SYMBOLS: usize = 255;
31
32/// Maximum symbol length in bytes.
33const MAX_SYMBOL_LEN: usize = 8;
34
35/// Escape byte: signals the next byte is a literal (not a symbol index).
36const ESCAPE: u8 = 255;
37
38/// Number of training passes over the input data.
39const TRAINING_ROUNDS: usize = 5;
40
41// ---------------------------------------------------------------------------
42// Symbol table
43// ---------------------------------------------------------------------------
44
45/// A trained FSST symbol table.
46#[derive(Debug, Clone)]
47struct SymbolTable {
48    /// Symbols sorted by length (longest first) for greedy matching.
49    symbols: Vec<Vec<u8>>,
50}
51
52impl SymbolTable {
53    /// Train a symbol table from a set of input strings.
54    ///
55    /// Uses iterative count-based selection: in each round, count how many
56    /// bytes each candidate n-gram would save, pick the best, repeat.
57    fn train(strings: &[&[u8]]) -> Self {
58        if strings.is_empty() {
59            return Self {
60                symbols: Vec::new(),
61            };
62        }
63
64        let mut symbols: Vec<Vec<u8>> = Vec::new();
65        let mut symbol_set: std::collections::HashSet<Vec<u8>> = std::collections::HashSet::new();
66        let mut candidates: std::collections::HashMap<Vec<u8>, usize> =
67            std::collections::HashMap::new();
68
69        for _round in 0..TRAINING_ROUNDS {
70            // Count n-gram frequencies in the data (after encoding with current table).
71            candidates.clear();
72
73            for s in strings {
74                // Scan for n-grams of length 1-8 that are NOT already covered by symbols.
75                let mut pos = 0;
76                while pos < s.len() {
77                    // Check if current position starts with a known symbol.
78                    let existing_match = longest_symbol_match(&symbols, s, pos);
79
80                    if existing_match > 0 {
81                        pos += existing_match;
82                        continue;
83                    }
84
85                    // No existing symbol matches — count new n-gram candidates.
86                    for len in 1..=MAX_SYMBOL_LEN.min(s.len() - pos) {
87                        let ngram = &s[pos..pos + len];
88                        *candidates.entry(ngram.to_vec()).or_insert(0) += 1;
89                    }
90                    pos += 1;
91                }
92            }
93
94            if candidates.is_empty() {
95                break;
96            }
97
98            // Score candidates by compression gain: frequency * (length - 1).
99            // Each symbol saves (length - 1) bytes per occurrence (1 byte for
100            // the symbol index vs `length` bytes raw).
101            let mut scored: Vec<(Vec<u8>, usize)> = candidates
102                .drain()
103                .map(|(ngram, freq)| {
104                    let gain = freq * (ngram.len().saturating_sub(1));
105                    (ngram, gain)
106                })
107                .filter(|(_, gain)| *gain > 0)
108                .collect();
109
110            scored.sort_by_key(|a| std::cmp::Reverse(a.1));
111
112            // Add top candidates that don't duplicate existing symbols.
113            for (ngram, _) in scored {
114                if symbols.len() >= MAX_SYMBOLS {
115                    break;
116                }
117                if symbol_set.insert(ngram.clone()) {
118                    symbols.push(ngram);
119                }
120            }
121        }
122
123        // Sort symbols longest-first for greedy matching.
124        symbols.sort_by_key(|a| std::cmp::Reverse(a.len()));
125
126        Self { symbols }
127    }
128
129    fn symbol_count(&self) -> usize {
130        self.symbols.len()
131    }
132}
133
134/// Find the longest symbol matching at position `pos` in `data`.
135/// Returns the match length (0 if no match).
136fn longest_symbol_match(symbols: &[Vec<u8>], data: &[u8], pos: usize) -> usize {
137    let remaining = &data[pos..];
138    for sym in symbols {
139        if remaining.starts_with(sym) {
140            return sym.len();
141        }
142    }
143    0
144}
145
146// ---------------------------------------------------------------------------
147// Public encode / decode API
148// ---------------------------------------------------------------------------
149
150/// Encode a batch of strings using FSST compression.
151///
152/// Trains a symbol table on the input, then encodes each string as a
153/// sequence of symbol indices and escaped literals.
154pub fn encode(strings: &[&[u8]]) -> Vec<u8> {
155    let table = SymbolTable::train(strings);
156
157    // Encode each string.
158    let mut encoded_strings: Vec<Vec<u8>> = Vec::with_capacity(strings.len());
159    for s in strings {
160        encoded_strings.push(encode_string(&table, s));
161    }
162
163    // Build wire format.
164    let mut out = Vec::new();
165
166    // Symbol table.
167    out.extend_from_slice(&(table.symbol_count() as u16).to_le_bytes());
168    for sym in &table.symbols {
169        out.push(sym.len() as u8);
170        out.extend_from_slice(sym);
171    }
172
173    // Encoded strings with offset table.
174    let total_encoded: usize = encoded_strings.iter().map(|s| s.len()).sum();
175    out.extend_from_slice(&(total_encoded as u32).to_le_bytes());
176    out.extend_from_slice(&(strings.len() as u32).to_le_bytes());
177
178    // Cumulative offsets.
179    let mut offset = 0u32;
180    for es in &encoded_strings {
181        offset += es.len() as u32;
182        out.extend_from_slice(&offset.to_le_bytes());
183    }
184
185    // Encoded data.
186    for es in &encoded_strings {
187        out.extend_from_slice(es);
188    }
189
190    out
191}
192
193/// Decode FSST-compressed data back to strings.
194pub fn decode(data: &[u8]) -> Result<Vec<Vec<u8>>, CodecError> {
195    if data.len() < 2 {
196        return Err(CodecError::Truncated {
197            expected: 2,
198            actual: data.len(),
199        });
200    }
201
202    // Read symbol table.
203    let sym_count = u16::from_le_bytes([data[0], data[1]]) as usize;
204    let mut pos = 2;
205    let mut symbols: Vec<Vec<u8>> = Vec::with_capacity(sym_count);
206
207    for _ in 0..sym_count {
208        if pos >= data.len() {
209            return Err(CodecError::Truncated {
210                expected: pos + 1,
211                actual: data.len(),
212            });
213        }
214        let len = data[pos] as usize;
215        pos += 1;
216        if pos + len > data.len() {
217            return Err(CodecError::Truncated {
218                expected: pos + len,
219                actual: data.len(),
220            });
221        }
222        symbols.push(data[pos..pos + len].to_vec());
223        pos += len;
224    }
225
226    // Read header.
227    if pos + 8 > data.len() {
228        return Err(CodecError::Truncated {
229            expected: pos + 8,
230            actual: data.len(),
231        });
232    }
233    let _total_encoded =
234        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
235    pos += 4;
236    let string_count =
237        u32::from_le_bytes([data[pos], data[pos + 1], data[pos + 2], data[pos + 3]]) as usize;
238    pos += 4;
239
240    // Read offsets.
241    let offsets_size = string_count * 4;
242    if pos + offsets_size > data.len() {
243        return Err(CodecError::Truncated {
244            expected: pos + offsets_size,
245            actual: data.len(),
246        });
247    }
248    let mut offsets = Vec::with_capacity(string_count);
249    for i in 0..string_count {
250        let off_pos = pos + i * 4;
251        offsets.push(u32::from_le_bytes([
252            data[off_pos],
253            data[off_pos + 1],
254            data[off_pos + 2],
255            data[off_pos + 3],
256        ]) as usize);
257    }
258    pos += offsets_size;
259
260    let encoded_data = &data[pos..];
261
262    // Decode each string.
263    let mut result = Vec::with_capacity(string_count);
264    let mut prev_end = 0;
265    for &end in &offsets {
266        if end > encoded_data.len() {
267            return Err(CodecError::Truncated {
268                expected: pos + end,
269                actual: data.len(),
270            });
271        }
272        let encoded_str = &encoded_data[prev_end..end];
273        result.push(decode_string(&symbols, encoded_str)?);
274        prev_end = end;
275    }
276
277    Ok(result)
278}
279
280/// Convenience: encode a single contiguous byte buffer that contains
281/// multiple strings separated by a delimiter (e.g., newlines for log data).
282pub fn encode_delimited(data: &[u8], delimiter: u8) -> Vec<u8> {
283    let strings: Vec<&[u8]> = data.split(|&b| b == delimiter).collect();
284    encode(&strings)
285}
286
287/// Convenience: decode and reassemble with delimiter.
288pub fn decode_delimited(data: &[u8], delimiter: u8) -> Result<Vec<u8>, CodecError> {
289    let strings = decode(data)?;
290    let mut out = Vec::new();
291    for (i, s) in strings.iter().enumerate() {
292        if i > 0 {
293            out.push(delimiter);
294        }
295        out.extend_from_slice(s);
296    }
297    Ok(out)
298}
299
300// ---------------------------------------------------------------------------
301// Per-string encode / decode
302// ---------------------------------------------------------------------------
303
304fn encode_string(table: &SymbolTable, input: &[u8]) -> Vec<u8> {
305    let mut out = Vec::with_capacity(input.len());
306    let mut pos = 0;
307
308    while pos < input.len() {
309        // Greedy: try to match the longest symbol at current position.
310        let mut matched = false;
311        for (idx, sym) in table.symbols.iter().enumerate() {
312            if input[pos..].starts_with(sym) {
313                out.push(idx as u8);
314                pos += sym.len();
315                matched = true;
316                break;
317            }
318        }
319
320        if !matched {
321            // No symbol matches — emit escape + literal byte.
322            out.push(ESCAPE);
323            out.push(input[pos]);
324            pos += 1;
325        }
326    }
327
328    out
329}
330
331fn decode_string(symbols: &[Vec<u8>], encoded: &[u8]) -> Result<Vec<u8>, CodecError> {
332    let mut out = Vec::with_capacity(encoded.len() * 2);
333    let mut pos = 0;
334
335    while pos < encoded.len() {
336        let byte = encoded[pos];
337        pos += 1;
338
339        if byte == ESCAPE {
340            // Next byte is a literal.
341            if pos >= encoded.len() {
342                return Err(CodecError::Corrupt {
343                    detail: "FSST escape at end of encoded data".into(),
344                });
345            }
346            out.push(encoded[pos]);
347            pos += 1;
348        } else {
349            // Symbol index.
350            let idx = byte as usize;
351            if idx >= symbols.len() {
352                return Err(CodecError::Corrupt {
353                    detail: format!(
354                        "FSST symbol index {idx} out of range (max {})",
355                        symbols.len()
356                    ),
357                });
358            }
359            out.extend_from_slice(&symbols[idx]);
360        }
361    }
362
363    Ok(out)
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369
370    #[test]
371    fn empty_input() {
372        let encoded = encode(&[]);
373        let decoded = decode(&encoded).unwrap();
374        assert!(decoded.is_empty());
375    }
376
377    #[test]
378    fn single_string() {
379        let strings: Vec<&[u8]> = vec![b"hello world"];
380        let encoded = encode(&strings);
381        let decoded = decode(&encoded).unwrap();
382        assert_eq!(decoded.len(), 1);
383        assert_eq!(decoded[0], b"hello world");
384    }
385
386    #[test]
387    fn multiple_strings_roundtrip() {
388        let strings: Vec<&[u8]> = vec![
389            b"us-east-1",
390            b"us-east-2",
391            b"us-west-1",
392            b"eu-west-1",
393            b"us-east-1",
394            b"us-east-1",
395        ];
396        let encoded = encode(&strings);
397        let decoded = decode(&encoded).unwrap();
398        assert_eq!(decoded.len(), strings.len());
399        for (a, b) in strings.iter().zip(decoded.iter()) {
400            assert_eq!(*a, b.as_slice());
401        }
402    }
403
404    #[test]
405    fn repetitive_log_lines() {
406        let lines: Vec<&[u8]> = (0..1000)
407            .map(|i| {
408                let s: &[u8] = match i % 5 {
409                    0 => b"2024-01-15 INFO server.handler request_id=abc method=GET status=200",
410                    1 => b"2024-01-15 INFO server.handler request_id=def method=POST status=201",
411                    2 => b"2024-01-15 WARN server.handler request_id=ghi method=GET status=404",
412                    3 => b"2024-01-15 ERROR server.handler request_id=jkl method=PUT status=500",
413                    _ => b"2024-01-15 DEBUG server.handler request_id=mno method=GET status=200",
414                };
415                s
416            })
417            .collect();
418
419        let encoded = encode(&lines);
420        let decoded = decode(&encoded).unwrap();
421        assert_eq!(decoded.len(), lines.len());
422        for (a, b) in lines.iter().zip(decoded.iter()) {
423            assert_eq!(*a, b.as_slice());
424        }
425
426        // FSST should compress repetitive logs.
427        let raw_size: usize = lines.iter().map(|s| s.len()).sum();
428        let ratio = raw_size as f64 / encoded.len() as f64;
429        assert!(
430            ratio > 1.5,
431            "FSST should compress repetitive logs >1.5x, got {ratio:.1}x"
432        );
433    }
434
435    #[test]
436    fn hostnames() {
437        let hosts: Vec<&[u8]> = vec![
438            b"prod-web-01.us-east-1.example.com",
439            b"prod-web-02.us-east-1.example.com",
440            b"prod-web-03.us-east-1.example.com",
441            b"prod-api-01.us-west-2.example.com",
442            b"prod-api-02.us-west-2.example.com",
443            b"staging-web-01.eu-west-1.example.com",
444        ];
445        let encoded = encode(&hosts);
446        let decoded = decode(&encoded).unwrap();
447        for (a, b) in hosts.iter().zip(decoded.iter()) {
448            assert_eq!(*a, b.as_slice());
449        }
450    }
451
452    #[test]
453    fn binary_data() {
454        // Binary data with no patterns — should still roundtrip (escape every byte).
455        let data: Vec<&[u8]> = vec![&[0, 1, 2, 3, 4, 255, 254, 253]];
456        let encoded = encode(&data);
457        let decoded = decode(&encoded).unwrap();
458        assert_eq!(decoded[0], data[0]);
459    }
460
461    #[test]
462    fn empty_strings() {
463        let strings: Vec<&[u8]> = vec![b"", b"hello", b"", b"world", b""];
464        let encoded = encode(&strings);
465        let decoded = decode(&encoded).unwrap();
466        assert_eq!(decoded.len(), 5);
467        assert!(decoded[0].is_empty());
468        assert_eq!(decoded[1], b"hello");
469        assert!(decoded[2].is_empty());
470    }
471
472    #[test]
473    fn delimited_roundtrip() {
474        let data = b"line one\nline two\nline three\nline one\nline two";
475        let encoded = encode_delimited(data, b'\n');
476        let decoded = decode_delimited(&encoded, b'\n').unwrap();
477        assert_eq!(decoded, data);
478    }
479
480    #[test]
481    fn compression_ratio_structured_logs() {
482        let mut lines: Vec<Vec<u8>> = Vec::new();
483        for i in 0..5000 {
484            let line = format!(
485                "2024-01-15T10:30:{:02}.000Z INFO server.handler request_id={} method=GET path=/api/v1/metrics status=200 duration_ms={}",
486                i % 60,
487                10000 + i,
488                i * 3 + 1
489            );
490            lines.push(line.into_bytes());
491        }
492        let refs: Vec<&[u8]> = lines.iter().map(|l| l.as_slice()).collect();
493
494        let encoded = encode(&refs);
495        let decoded = decode(&encoded).unwrap();
496        assert_eq!(decoded.len(), lines.len());
497
498        let raw_size: usize = lines.iter().map(|s| s.len()).sum();
499        let ratio = raw_size as f64 / encoded.len() as f64;
500        assert!(
501            ratio > 1.5,
502            "FSST should compress structured logs >1.5x, got {ratio:.1}x"
503        );
504    }
505
506    #[test]
507    fn truncated_input_errors() {
508        assert!(decode(&[]).is_err());
509        assert!(decode(&[1]).is_err());
510    }
511
512    #[test]
513    fn large_dataset() {
514        let mut strings: Vec<Vec<u8>> = Vec::new();
515        for i in 0..10_000 {
516            strings.push(format!("key-{}-value-{}", i % 100, i % 50).into_bytes());
517        }
518        let refs: Vec<&[u8]> = strings.iter().map(|s| s.as_slice()).collect();
519        let encoded = encode(&refs);
520        let decoded = decode(&encoded).unwrap();
521        assert_eq!(decoded.len(), strings.len());
522        for (a, b) in strings.iter().zip(decoded.iter()) {
523            assert_eq!(a.as_slice(), b.as_slice());
524        }
525    }
526}