Skip to main content

hdf5_reader/filters/
mod.rs

1pub mod deflate;
2pub mod fletcher32;
3#[cfg(feature = "lz4")]
4pub mod lz4;
5pub mod nbit;
6pub mod scaleoffset;
7pub mod shuffle;
8
9use std::collections::HashMap;
10
11use crate::error::{Error, Result};
12use crate::messages::filter_pipeline::FilterDescription;
13
14/// Standard HDF5 filter IDs.
15pub const FILTER_DEFLATE: u16 = 1;
16pub const FILTER_SHUFFLE: u16 = 2;
17pub const FILTER_FLETCHER32: u16 = 3;
18pub const FILTER_SZIP: u16 = 4;
19pub const FILTER_NBIT: u16 = 5;
20pub const FILTER_SCALEOFFSET: u16 = 6;
21/// HDF5 registered LZ4 filter.
22pub const FILTER_LZ4: u16 = 32004;
23
24/// A user-supplied filter function.
25///
26/// Takes the filter description, input data, and element size, then returns
27/// the decoded output.
28pub type FilterFn = Box<dyn Fn(&FilterDescription, &[u8], usize) -> Result<Vec<u8>> + Send + Sync>;
29
30/// A registry of filter implementations.
31///
32/// Comes pre-loaded with deflate, shuffle, and fletcher32. Users can register
33/// additional filters (e.g., Blosc, LZ4, ZFP) before reading datasets.
34pub struct FilterRegistry {
35    filters: HashMap<u16, FilterFn>,
36}
37
38impl FilterRegistry {
39    /// Create a new registry with the built-in filters pre-registered.
40    pub fn new() -> Self {
41        let mut registry = FilterRegistry {
42            filters: HashMap::new(),
43        };
44        registry.register(
45            FILTER_DEFLATE,
46            Box::new(|_, data, _| deflate::decompress(data)),
47        );
48        registry.register(
49            FILTER_SHUFFLE,
50            Box::new(|_, data, elem_size| Ok(shuffle::unshuffle(data, elem_size))),
51        );
52        registry.register(
53            FILTER_FLETCHER32,
54            Box::new(|_, data, _| fletcher32::verify_and_strip(data)),
55        );
56        registry.register(
57            FILTER_NBIT,
58            Box::new(|filter, data, _| nbit::decompress(data, &filter.client_data)),
59        );
60        registry.register(
61            FILTER_SCALEOFFSET,
62            Box::new(|filter, data, _| scaleoffset::decompress(data, &filter.client_data)),
63        );
64        #[cfg(feature = "lz4")]
65        registry.register(FILTER_LZ4, Box::new(|_, data, _| lz4::decompress(data)));
66        registry
67    }
68
69    /// Register a custom filter implementation for the given filter ID.
70    ///
71    /// Overwrites any previously registered filter with the same ID.
72    pub fn register(&mut self, id: u16, f: FilterFn) {
73        self.filters.insert(id, f);
74    }
75
76    /// Apply a single filter by ID.
77    pub fn apply(
78        &self,
79        filter: &FilterDescription,
80        data: &[u8],
81        element_size: usize,
82    ) -> Result<Vec<u8>> {
83        match self.filters.get(&filter.id) {
84            Some(f) => f(filter, data, element_size),
85            None => Err(Error::UnsupportedFilter(format!("filter id {}", filter.id))),
86        }
87    }
88}
89
90impl Default for FilterRegistry {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96/// Apply the filter pipeline in reverse (decompression direction) to a chunk.
97///
98/// HDF5 stores filters in the order they were applied during writing.
99/// On read, we apply them in reverse order.
100///
101/// If `registry` is `None`, the built-in filter set is used.
102///
103/// `filter_mask` is a bitmask where bit N being set means filter N should be skipped.
104pub fn apply_pipeline(
105    data: &[u8],
106    filters: &[FilterDescription],
107    filter_mask: u32,
108    element_size: usize,
109    registry: Option<&FilterRegistry>,
110) -> Result<Vec<u8>> {
111    // Count active filters to decide on single-buffer vs double-buffer strategy.
112    let active_count = filters
113        .iter()
114        .enumerate()
115        .rev()
116        .filter(|(i, _)| filter_mask & (1 << i) == 0)
117        .count();
118
119    if active_count == 0 {
120        return Ok(data.to_vec());
121    }
122
123    // For a single active filter, avoid the double-buffer overhead.
124    if active_count == 1 {
125        for (i, filter) in filters.iter().enumerate().rev() {
126            if filter_mask & (1 << i) != 0 {
127                continue;
128            }
129            return if let Some(reg) = registry {
130                reg.apply(filter, data, element_size)
131            } else {
132                apply_builtin_filter(filter, data, element_size)
133            };
134        }
135    }
136
137    // Multi-filter pipeline: the first stage reads from the borrowed input
138    // slice (avoiding a copy), subsequent stages consume the previous output.
139    // Each filter stage necessarily allocates (output sizes are unpredictable),
140    // but we avoid the initial data.to_vec() copy.
141    let mut owned: Option<Vec<u8>> = None;
142
143    for (i, filter) in filters.iter().enumerate().rev() {
144        if filter_mask & (1 << i) != 0 {
145            continue;
146        }
147
148        let input: &[u8] = match &owned {
149            Some(buf) => buf,
150            None => data,
151        };
152
153        owned = Some(if let Some(reg) = registry {
154            reg.apply(filter, input, element_size)?
155        } else {
156            apply_builtin_filter(filter, input, element_size)?
157        });
158    }
159
160    Ok(owned.unwrap_or_else(|| data.to_vec()))
161}
162
163fn apply_builtin_filter(
164    filter: &FilterDescription,
165    data: &[u8],
166    element_size: usize,
167) -> Result<Vec<u8>> {
168    match filter.id {
169        FILTER_DEFLATE => deflate::decompress(data),
170        FILTER_SHUFFLE => Ok(shuffle::unshuffle(data, element_size)),
171        FILTER_FLETCHER32 => fletcher32::verify_and_strip(data),
172        FILTER_SZIP => Err(Error::UnsupportedFilter("szip".into())),
173        FILTER_NBIT => nbit::decompress(data, &filter.client_data),
174        FILTER_SCALEOFFSET => scaleoffset::decompress(data, &filter.client_data),
175        #[cfg(feature = "lz4")]
176        FILTER_LZ4 => lz4::decompress(data),
177        id => Err(Error::UnsupportedFilter(format!("filter id {}", id))),
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    #[test]
186    fn test_filter_registry_default() {
187        let registry = FilterRegistry::new();
188        // Built-in filters should be registered
189        assert!(registry.filters.contains_key(&FILTER_DEFLATE));
190        assert!(registry.filters.contains_key(&FILTER_SHUFFLE));
191        assert!(registry.filters.contains_key(&FILTER_FLETCHER32));
192        assert!(registry.filters.contains_key(&FILTER_NBIT));
193        assert!(registry.filters.contains_key(&FILTER_SCALEOFFSET));
194    }
195
196    #[test]
197    fn test_filter_registry_custom() {
198        let mut registry = FilterRegistry::new();
199        // Register a no-op custom filter
200        registry.register(32000, Box::new(|_, data, _| Ok(data.to_vec())));
201        let filter = FilterDescription {
202            id: 32000,
203            name: None,
204            client_data: Vec::new(),
205        };
206        let result = registry.apply(&filter, &[1, 2, 3], 1).unwrap();
207        assert_eq!(result, vec![1, 2, 3]);
208    }
209
210    #[test]
211    fn test_filter_registry_unknown() {
212        let registry = FilterRegistry::new();
213        let filter = FilterDescription {
214            id: 9999,
215            name: None,
216            client_data: Vec::new(),
217        };
218        let err = registry.apply(&filter, &[1, 2, 3], 1).unwrap_err();
219        assert!(matches!(err, Error::UnsupportedFilter(_)));
220    }
221}