1pub mod deflate;
2pub mod fletcher32;
3#[cfg(feature = "lz4")]
4pub mod lz4;
5pub mod nbit;
6pub mod scaleoffset;
7pub mod shuffle;
8
9use std::collections::HashMap;
10
11use crate::error::{Error, Result};
12use crate::messages::filter_pipeline::FilterDescription;
13
14pub const FILTER_DEFLATE: u16 = 1;
16pub const FILTER_SHUFFLE: u16 = 2;
17pub const FILTER_FLETCHER32: u16 = 3;
18pub const FILTER_SZIP: u16 = 4;
19pub const FILTER_NBIT: u16 = 5;
20pub const FILTER_SCALEOFFSET: u16 = 6;
21pub const FILTER_LZ4: u16 = 32004;
23
24pub type FilterFn = Box<dyn Fn(&FilterDescription, &[u8], usize) -> Result<Vec<u8>> + Send + Sync>;
29
30pub struct FilterRegistry {
35 filters: HashMap<u16, FilterFn>,
36}
37
38impl FilterRegistry {
39 pub fn new() -> Self {
41 let mut registry = FilterRegistry {
42 filters: HashMap::new(),
43 };
44 registry.register(
45 FILTER_DEFLATE,
46 Box::new(|_, data, _| deflate::decompress(data)),
47 );
48 registry.register(
49 FILTER_SHUFFLE,
50 Box::new(|_, data, elem_size| Ok(shuffle::unshuffle(data, elem_size))),
51 );
52 registry.register(
53 FILTER_FLETCHER32,
54 Box::new(|_, data, _| fletcher32::verify_and_strip(data)),
55 );
56 registry.register(
57 FILTER_NBIT,
58 Box::new(|filter, data, _| nbit::decompress(data, &filter.client_data)),
59 );
60 registry.register(
61 FILTER_SCALEOFFSET,
62 Box::new(|filter, data, _| scaleoffset::decompress(data, &filter.client_data)),
63 );
64 #[cfg(feature = "lz4")]
65 registry.register(FILTER_LZ4, Box::new(|_, data, _| lz4::decompress(data)));
66 registry
67 }
68
69 pub fn register(&mut self, id: u16, f: FilterFn) {
73 self.filters.insert(id, f);
74 }
75
76 pub fn apply(
78 &self,
79 filter: &FilterDescription,
80 data: &[u8],
81 element_size: usize,
82 ) -> Result<Vec<u8>> {
83 match self.filters.get(&filter.id) {
84 Some(f) => f(filter, data, element_size),
85 None => Err(Error::UnsupportedFilter(format!("filter id {}", filter.id))),
86 }
87 }
88}
89
90impl Default for FilterRegistry {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96pub fn apply_pipeline(
105 data: &[u8],
106 filters: &[FilterDescription],
107 filter_mask: u32,
108 element_size: usize,
109 registry: Option<&FilterRegistry>,
110) -> Result<Vec<u8>> {
111 let active_count = filters
113 .iter()
114 .enumerate()
115 .rev()
116 .filter(|(i, _)| filter_mask & (1 << i) == 0)
117 .count();
118
119 if active_count == 0 {
120 return Ok(data.to_vec());
121 }
122
123 if active_count == 1 {
125 for (i, filter) in filters.iter().enumerate().rev() {
126 if filter_mask & (1 << i) != 0 {
127 continue;
128 }
129 return if let Some(reg) = registry {
130 reg.apply(filter, data, element_size)
131 } else {
132 apply_builtin_filter(filter, data, element_size)
133 };
134 }
135 }
136
137 let mut owned: Option<Vec<u8>> = None;
142
143 for (i, filter) in filters.iter().enumerate().rev() {
144 if filter_mask & (1 << i) != 0 {
145 continue;
146 }
147
148 let input: &[u8] = match &owned {
149 Some(buf) => buf,
150 None => data,
151 };
152
153 owned = Some(if let Some(reg) = registry {
154 reg.apply(filter, input, element_size)?
155 } else {
156 apply_builtin_filter(filter, input, element_size)?
157 });
158 }
159
160 Ok(owned.unwrap_or_else(|| data.to_vec()))
161}
162
163fn apply_builtin_filter(
164 filter: &FilterDescription,
165 data: &[u8],
166 element_size: usize,
167) -> Result<Vec<u8>> {
168 match filter.id {
169 FILTER_DEFLATE => deflate::decompress(data),
170 FILTER_SHUFFLE => Ok(shuffle::unshuffle(data, element_size)),
171 FILTER_FLETCHER32 => fletcher32::verify_and_strip(data),
172 FILTER_SZIP => Err(Error::UnsupportedFilter("szip".into())),
173 FILTER_NBIT => nbit::decompress(data, &filter.client_data),
174 FILTER_SCALEOFFSET => scaleoffset::decompress(data, &filter.client_data),
175 #[cfg(feature = "lz4")]
176 FILTER_LZ4 => lz4::decompress(data),
177 id => Err(Error::UnsupportedFilter(format!("filter id {}", id))),
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184
185 #[test]
186 fn test_filter_registry_default() {
187 let registry = FilterRegistry::new();
188 assert!(registry.filters.contains_key(&FILTER_DEFLATE));
190 assert!(registry.filters.contains_key(&FILTER_SHUFFLE));
191 assert!(registry.filters.contains_key(&FILTER_FLETCHER32));
192 assert!(registry.filters.contains_key(&FILTER_NBIT));
193 assert!(registry.filters.contains_key(&FILTER_SCALEOFFSET));
194 }
195
196 #[test]
197 fn test_filter_registry_custom() {
198 let mut registry = FilterRegistry::new();
199 registry.register(32000, Box::new(|_, data, _| Ok(data.to_vec())));
201 let filter = FilterDescription {
202 id: 32000,
203 name: None,
204 client_data: Vec::new(),
205 };
206 let result = registry.apply(&filter, &[1, 2, 3], 1).unwrap();
207 assert_eq!(result, vec![1, 2, 3]);
208 }
209
210 #[test]
211 fn test_filter_registry_unknown() {
212 let registry = FilterRegistry::new();
213 let filter = FilterDescription {
214 id: 9999,
215 name: None,
216 client_data: Vec::new(),
217 };
218 let err = registry.apply(&filter, &[1, 2, 3], 1).unwrap_err();
219 assert!(matches!(err, Error::UnsupportedFilter(_)));
220 }
221}