1#[cfg(feature = "nvcomp-gpu")]
27mod imp {
28 use std::sync::Arc;
29
30 use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
31 use bytes::Bytes;
32
33 use crate::{ChunkManifest, Codec, CodecError, CodecKind};
34
35 pub struct NvcompZstdCodec {
37 inner: Arc<NvcompCodec>,
38 }
39
40 impl NvcompZstdCodec {
41 pub fn new() -> Result<Self, CodecError> {
42 let inner = NvcompCodec::new(Algo::Zstd)
43 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
44 Ok(Self {
45 inner: Arc::new(inner),
46 })
47 }
48 }
49
50 #[async_trait::async_trait]
51 impl Codec for NvcompZstdCodec {
52 fn kind(&self) -> CodecKind {
53 CodecKind::NvcompZstd
54 }
55
56 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
57 let original_size = input.len() as u64;
58 let original_crc = crc32c::crc32c(&input);
59 let codec = Arc::clone(&self.inner);
60 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
61 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
62 codec.compress(input.as_ref(), &mut out).map_err(|e| {
63 CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
64 })?;
65 Ok(out)
66 })
67 .await??;
68 let manifest = ChunkManifest {
69 codec: CodecKind::NvcompZstd,
70 original_size,
71 compressed_size: compressed.len() as u64,
72 crc32c: original_crc,
73 };
74 Ok((Bytes::from(compressed), manifest))
75 }
76
77 async fn decompress(
78 &self,
79 input: Bytes,
80 manifest: &ChunkManifest,
81 ) -> Result<Bytes, CodecError> {
82 if manifest.codec != CodecKind::NvcompZstd {
83 return Err(CodecError::CodecMismatch {
84 expected: CodecKind::NvcompZstd,
85 got: manifest.codec,
86 });
87 }
88 let expected_crc = manifest.crc32c;
89 let expected_orig_size = manifest.original_size as usize;
90 let codec = Arc::clone(&self.inner);
91 let decompressed =
92 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
93 let mut out = Vec::with_capacity(expected_orig_size);
94 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
95 CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
96 })?;
97 Ok(out)
98 })
99 .await??;
100 if decompressed.len() != expected_orig_size {
101 return Err(CodecError::SizeMismatch {
102 expected: manifest.original_size,
103 got: decompressed.len() as u64,
104 });
105 }
106 let actual_crc = crc32c::crc32c(&decompressed);
107 if actual_crc != expected_crc {
108 return Err(CodecError::CrcMismatch {
109 expected: expected_crc,
110 got: actual_crc,
111 });
112 }
113 Ok(Bytes::from(decompressed))
114 }
115 }
116
117 pub struct NvcompBitcompCodec {
119 inner: Arc<NvcompCodec>,
120 }
121
122 impl NvcompBitcompCodec {
123 pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
126 let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
127 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
128 Ok(Self {
129 inner: Arc::new(inner),
130 })
131 }
132
133 pub fn default_general() -> Result<Self, CodecError> {
135 Self::new(BitcompDataType::Char)
136 }
137 }
138
139 #[async_trait::async_trait]
140 impl Codec for NvcompBitcompCodec {
141 fn kind(&self) -> CodecKind {
142 CodecKind::NvcompBitcomp
143 }
144
145 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
146 let original_size = input.len() as u64;
147 let original_crc = crc32c::crc32c(&input);
148 let codec = Arc::clone(&self.inner);
149 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
150 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
151 codec.compress(input.as_ref(), &mut out).map_err(|e| {
152 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
153 })?;
154 Ok(out)
155 })
156 .await??;
157 let manifest = ChunkManifest {
158 codec: CodecKind::NvcompBitcomp,
159 original_size,
160 compressed_size: compressed.len() as u64,
161 crc32c: original_crc,
162 };
163 Ok((Bytes::from(compressed), manifest))
164 }
165
166 async fn decompress(
167 &self,
168 input: Bytes,
169 manifest: &ChunkManifest,
170 ) -> Result<Bytes, CodecError> {
171 if manifest.codec != CodecKind::NvcompBitcomp {
172 return Err(CodecError::CodecMismatch {
173 expected: CodecKind::NvcompBitcomp,
174 got: manifest.codec,
175 });
176 }
177 let expected_crc = manifest.crc32c;
178 let expected_orig_size = manifest.original_size as usize;
179 let codec = Arc::clone(&self.inner);
180 let decompressed =
181 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
182 let mut out = Vec::with_capacity(expected_orig_size);
183 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
184 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
185 })?;
186 Ok(out)
187 })
188 .await??;
189 if decompressed.len() != expected_orig_size {
190 return Err(CodecError::SizeMismatch {
191 expected: manifest.original_size,
192 got: decompressed.len() as u64,
193 });
194 }
195 let actual_crc = crc32c::crc32c(&decompressed);
196 if actual_crc != expected_crc {
197 return Err(CodecError::CrcMismatch {
198 expected: expected_crc,
199 got: actual_crc,
200 });
201 }
202 Ok(Bytes::from(decompressed))
203 }
204 }
205
206 pub struct NvcompGDeflateCodec {
211 inner: Arc<NvcompCodec>,
212 }
213
214 impl NvcompGDeflateCodec {
215 pub fn new() -> Result<Self, CodecError> {
216 let inner = NvcompCodec::new(Algo::GDeflate)
217 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate init: {e}")))?;
218 Ok(Self {
219 inner: Arc::new(inner),
220 })
221 }
222 }
223
224 #[async_trait::async_trait]
225 impl Codec for NvcompGDeflateCodec {
226 fn kind(&self) -> CodecKind {
227 CodecKind::NvcompGDeflate
228 }
229
230 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
231 let original_size = input.len() as u64;
232 let original_crc = crc32c::crc32c(&input);
233 let codec = Arc::clone(&self.inner);
234 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
235 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
236 codec.compress(input.as_ref(), &mut out).map_err(|e| {
237 CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate compress: {e}"))
238 })?;
239 Ok(out)
240 })
241 .await??;
242 let manifest = ChunkManifest {
243 codec: CodecKind::NvcompGDeflate,
244 original_size,
245 compressed_size: compressed.len() as u64,
246 crc32c: original_crc,
247 };
248 Ok((Bytes::from(compressed), manifest))
249 }
250
251 async fn decompress(
252 &self,
253 input: Bytes,
254 manifest: &ChunkManifest,
255 ) -> Result<Bytes, CodecError> {
256 if manifest.codec != CodecKind::NvcompGDeflate {
257 return Err(CodecError::CodecMismatch {
258 expected: CodecKind::NvcompGDeflate,
259 got: manifest.codec,
260 });
261 }
262 let expected_crc = manifest.crc32c;
263 let expected_orig_size = manifest.original_size as usize;
264 let codec = Arc::clone(&self.inner);
265 let decompressed =
266 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
267 let mut out = Vec::with_capacity(expected_orig_size);
268 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
269 CodecError::Backend(anyhow::anyhow!("nvcomp gdeflate decompress: {e}"))
270 })?;
271 Ok(out)
272 })
273 .await??;
274 if decompressed.len() != expected_orig_size {
275 return Err(CodecError::SizeMismatch {
276 expected: manifest.original_size,
277 got: decompressed.len() as u64,
278 });
279 }
280 let actual_crc = crc32c::crc32c(&decompressed);
281 if actual_crc != expected_crc {
282 return Err(CodecError::CrcMismatch {
283 expected: expected_crc,
284 got: actual_crc,
285 });
286 }
287 Ok(Bytes::from(decompressed))
288 }
289 }
290
291 pub fn is_gpu_available() -> bool {
293 NvcompCodec::is_available()
294 }
295}
296
297#[cfg(feature = "nvcomp-gpu")]
298pub use imp::{NvcompBitcompCodec, NvcompGDeflateCodec, NvcompZstdCodec, is_gpu_available};
299
300#[cfg(not(feature = "nvcomp-gpu"))]
301pub fn is_gpu_available() -> bool {
302 false
303}
304
305#[cfg(all(test, feature = "nvcomp-gpu"))]
306mod tests {
307 use super::*;
308 use crate::Codec;
309 use bytes::Bytes;
310
311 #[tokio::test]
312 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
313 async fn nvcomp_zstd_roundtrip() {
314 if !is_gpu_available() {
315 eprintln!("skipping: no CUDA GPU detected at runtime");
316 return;
317 }
318 let codec = NvcompZstdCodec::new().expect("init");
319 let input = Bytes::from(vec![b'a'; 100_000]);
320 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
321 assert!(compressed.len() < input.len() / 10);
322 let decompressed = codec
323 .decompress(compressed, &manifest)
324 .await
325 .expect("decompress");
326 assert_eq!(decompressed, input);
327 }
328
329 #[tokio::test]
330 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
331 async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
332 if !is_gpu_available() {
333 eprintln!("skipping: no CUDA GPU detected at runtime");
334 return;
335 }
336 let codec = NvcompBitcompCodec::default_general().expect("init");
337 let mut payload: Vec<u8> = Vec::with_capacity(8192);
339 for i in 0i64..1024 {
340 payload.extend_from_slice(&i.to_le_bytes());
341 }
342 let input = Bytes::from(payload);
343 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
344 assert!(
346 compressed.len() < input.len() / 2,
347 "bitcomp on monotone i64 should compress >2x, got {} -> {}",
348 input.len(),
349 compressed.len()
350 );
351 let decompressed = codec
352 .decompress(compressed, &manifest)
353 .await
354 .expect("decompress");
355 assert_eq!(decompressed, input);
356 }
357}