1#[cfg(feature = "nvcomp-gpu")]
27mod imp {
28 use std::sync::Arc;
29
30 use crate::ferro_compress::{Algo, BitcompDataType, Codec as FerroCodec, NvcompCodec};
31 use bytes::Bytes;
32
33 use crate::{ChunkManifest, Codec, CodecError, CodecKind};
34
35 pub struct NvcompZstdCodec {
37 inner: Arc<NvcompCodec>,
38 }
39
40 impl NvcompZstdCodec {
41 pub fn new() -> Result<Self, CodecError> {
42 let inner = NvcompCodec::new(Algo::Zstd)
43 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp zstd init: {e}")))?;
44 Ok(Self {
45 inner: Arc::new(inner),
46 })
47 }
48 }
49
50 #[async_trait::async_trait]
51 impl Codec for NvcompZstdCodec {
52 fn kind(&self) -> CodecKind {
53 CodecKind::NvcompZstd
54 }
55
56 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
57 let original_size = input.len() as u64;
58 let original_crc = crc32c::crc32c(&input);
59 let codec = Arc::clone(&self.inner);
60 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
61 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
62 codec.compress(input.as_ref(), &mut out).map_err(|e| {
63 CodecError::Backend(anyhow::anyhow!("nvcomp zstd compress: {e}"))
64 })?;
65 Ok(out)
66 })
67 .await??;
68 let manifest = ChunkManifest {
69 codec: CodecKind::NvcompZstd,
70 original_size,
71 compressed_size: compressed.len() as u64,
72 crc32c: original_crc,
73 };
74 Ok((Bytes::from(compressed), manifest))
75 }
76
77 async fn decompress(
78 &self,
79 input: Bytes,
80 manifest: &ChunkManifest,
81 ) -> Result<Bytes, CodecError> {
82 if manifest.codec != CodecKind::NvcompZstd {
83 return Err(CodecError::CodecMismatch {
84 expected: CodecKind::NvcompZstd,
85 got: manifest.codec,
86 });
87 }
88 let expected_crc = manifest.crc32c;
89 let expected_orig_size = manifest.original_size as usize;
90 let codec = Arc::clone(&self.inner);
91 let decompressed =
92 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
93 let mut out = Vec::with_capacity(expected_orig_size);
94 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
95 CodecError::Backend(anyhow::anyhow!("nvcomp zstd decompress: {e}"))
96 })?;
97 Ok(out)
98 })
99 .await??;
100 if decompressed.len() != expected_orig_size {
101 return Err(CodecError::SizeMismatch {
102 expected: manifest.original_size,
103 got: decompressed.len() as u64,
104 });
105 }
106 let actual_crc = crc32c::crc32c(&decompressed);
107 if actual_crc != expected_crc {
108 return Err(CodecError::CrcMismatch {
109 expected: expected_crc,
110 got: actual_crc,
111 });
112 }
113 Ok(Bytes::from(decompressed))
114 }
115 }
116
117 pub struct NvcompBitcompCodec {
119 inner: Arc<NvcompCodec>,
120 }
121
122 impl NvcompBitcompCodec {
123 pub fn new(data_type: BitcompDataType) -> Result<Self, CodecError> {
126 let inner = NvcompCodec::new(Algo::Bitcomp { data_type })
127 .map_err(|e| CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp init: {e}")))?;
128 Ok(Self {
129 inner: Arc::new(inner),
130 })
131 }
132
133 pub fn default_general() -> Result<Self, CodecError> {
135 Self::new(BitcompDataType::Char)
136 }
137 }
138
139 #[async_trait::async_trait]
140 impl Codec for NvcompBitcompCodec {
141 fn kind(&self) -> CodecKind {
142 CodecKind::NvcompBitcomp
143 }
144
145 async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError> {
146 let original_size = input.len() as u64;
147 let original_crc = crc32c::crc32c(&input);
148 let codec = Arc::clone(&self.inner);
149 let compressed = tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
150 let mut out = Vec::with_capacity(codec.max_compressed_len(input.len()));
151 codec.compress(input.as_ref(), &mut out).map_err(|e| {
152 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp compress: {e}"))
153 })?;
154 Ok(out)
155 })
156 .await??;
157 let manifest = ChunkManifest {
158 codec: CodecKind::NvcompBitcomp,
159 original_size,
160 compressed_size: compressed.len() as u64,
161 crc32c: original_crc,
162 };
163 Ok((Bytes::from(compressed), manifest))
164 }
165
166 async fn decompress(
167 &self,
168 input: Bytes,
169 manifest: &ChunkManifest,
170 ) -> Result<Bytes, CodecError> {
171 if manifest.codec != CodecKind::NvcompBitcomp {
172 return Err(CodecError::CodecMismatch {
173 expected: CodecKind::NvcompBitcomp,
174 got: manifest.codec,
175 });
176 }
177 let expected_crc = manifest.crc32c;
178 let expected_orig_size = manifest.original_size as usize;
179 let codec = Arc::clone(&self.inner);
180 let decompressed =
181 tokio::task::spawn_blocking(move || -> Result<Vec<u8>, CodecError> {
182 let mut out = Vec::with_capacity(expected_orig_size);
183 codec.decompress(input.as_ref(), &mut out).map_err(|e| {
184 CodecError::Backend(anyhow::anyhow!("nvcomp bitcomp decompress: {e}"))
185 })?;
186 Ok(out)
187 })
188 .await??;
189 if decompressed.len() != expected_orig_size {
190 return Err(CodecError::SizeMismatch {
191 expected: manifest.original_size,
192 got: decompressed.len() as u64,
193 });
194 }
195 let actual_crc = crc32c::crc32c(&decompressed);
196 if actual_crc != expected_crc {
197 return Err(CodecError::CrcMismatch {
198 expected: expected_crc,
199 got: actual_crc,
200 });
201 }
202 Ok(Bytes::from(decompressed))
203 }
204 }
205
206 pub fn is_gpu_available() -> bool {
208 NvcompCodec::is_available()
209 }
210}
211
212#[cfg(feature = "nvcomp-gpu")]
213pub use imp::{NvcompBitcompCodec, NvcompZstdCodec, is_gpu_available};
214
215#[cfg(not(feature = "nvcomp-gpu"))]
216pub fn is_gpu_available() -> bool {
217 false
218}
219
220#[cfg(all(test, feature = "nvcomp-gpu"))]
221mod tests {
222 use super::*;
223 use crate::Codec;
224 use bytes::Bytes;
225
226 #[tokio::test]
227 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
228 async fn nvcomp_zstd_roundtrip() {
229 if !is_gpu_available() {
230 eprintln!("skipping: no CUDA GPU detected at runtime");
231 return;
232 }
233 let codec = NvcompZstdCodec::new().expect("init");
234 let input = Bytes::from(vec![b'a'; 100_000]);
235 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
236 assert!(compressed.len() < input.len() / 10);
237 let decompressed = codec
238 .decompress(compressed, &manifest)
239 .await
240 .expect("decompress");
241 assert_eq!(decompressed, input);
242 }
243
244 #[tokio::test]
245 #[ignore = "requires CUDA-capable GPU + NVCOMP_HOME at build time"]
246 async fn nvcomp_bitcomp_roundtrip_on_integer_column() {
247 if !is_gpu_available() {
248 eprintln!("skipping: no CUDA GPU detected at runtime");
249 return;
250 }
251 let codec = NvcompBitcompCodec::default_general().expect("init");
252 let mut payload: Vec<u8> = Vec::with_capacity(8192);
254 for i in 0i64..1024 {
255 payload.extend_from_slice(&i.to_le_bytes());
256 }
257 let input = Bytes::from(payload);
258 let (compressed, manifest) = codec.compress(input.clone()).await.expect("compress");
259 assert!(
261 compressed.len() < input.len() / 2,
262 "bitcomp on monotone i64 should compress >2x, got {} -> {}",
263 input.len(),
264 compressed.len()
265 );
266 let decompressed = codec
267 .decompress(compressed, &manifest)
268 .await
269 .expect("decompress");
270 assert_eq!(decompressed, input);
271 }
272}