1#![allow(clippy::useless_conversion)]
5use std::sync::OnceLock;
23
24use bytes::Bytes;
25use pyo3::create_exception;
26use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
27use pyo3::prelude::*;
28use pyo3::types::PyBytes;
29use s4_codec_rs::{cpu_gzip, cpu_zstd, ChunkManifest, Codec, CodecError, CodecKind};
30use tokio::runtime::{Builder, Runtime};
31
32create_exception!(s4_codec, S4Error, PyValueError);
53create_exception!(s4_codec, S4CrcMismatchError, S4Error);
54create_exception!(s4_codec, S4SizeMismatchError, S4Error);
55create_exception!(s4_codec, S4CodecMismatchError, S4Error);
56create_exception!(s4_codec, S4UnregisteredCodecError, S4Error);
57create_exception!(s4_codec, S4ManifestSizeExceedsLimitError, S4Error);
58create_exception!(s4_codec, S4ManifestSizeMismatchError, S4Error);
59create_exception!(s4_codec, S4BackendError, PyRuntimeError);
60create_exception!(s4_codec, S4IoError, PyIOError);
61
62fn runtime() -> &'static Runtime {
63 static RT: OnceLock<Runtime> = OnceLock::new();
64 RT.get_or_init(|| {
65 Builder::new_multi_thread()
66 .enable_all()
67 .thread_name("s4-codec-py")
68 .build()
69 .expect("failed to start tokio runtime for s4_codec python binding")
70 })
71}
72
73fn codec_err_to_py(e: CodecError) -> PyErr {
74 use s4_codec_rs::CodecError::*;
75 match e {
76 SizeMismatch { expected, got } => {
77 S4SizeMismatchError::new_err(format!("size mismatch: expected {expected}, got {got}"))
78 }
79 CrcMismatch { expected, got } => S4CrcMismatchError::new_err(format!(
80 "crc32c mismatch: expected {expected:#010x}, got {got:#010x}"
81 )),
82 CodecMismatch { expected, got } => S4CodecMismatchError::new_err(format!(
83 "codec mismatch: expected {expected:?}, got {got:?}"
84 )),
85 UnregisteredCodec(k) => {
86 S4UnregisteredCodecError::new_err(format!("codec {k:?} not registered"))
87 }
88 ManifestSizeExceedsLimit { requested, limit } => S4ManifestSizeExceedsLimitError::new_err(
89 format!("manifest claims {requested} bytes but limit is {limit}"),
90 ),
91 ManifestSizeMismatch { manifest, actual } => S4ManifestSizeMismatchError::new_err(format!(
92 "manifest claims {manifest} bytes but body is {actual}"
93 )),
94 Backend(msg) => S4BackendError::new_err(format!("backend: {msg}")),
95 Io(e) => S4IoError::new_err(format!("io: {e}")),
96 TruncatedStream { expected, got } => S4Error::new_err(format!(
97 "stream truncated: expected {expected} input bytes, got {got}"
98 )),
99 OverlengthStream { expected, got } => S4Error::new_err(format!(
105 "stream over-length: expected {expected} input bytes, got at least {got}"
106 )),
107 Join(e) => S4BackendError::new_err(format!("backend (worker join): {e}")),
111 }
112}
113
114fn manifest_from_parts(
115 kind: CodecKind,
116 payload_len: u64,
117 original_size: u64,
118 crc32c: u32,
119) -> ChunkManifest {
120 ChunkManifest {
121 codec: kind,
122 original_size,
123 compressed_size: payload_len,
124 crc32c,
125 }
126}
127
128fn block_on<F, T>(py: Python<'_>, fut: F) -> T
132where
133 F: std::future::Future<Output = T> + Send,
134 T: Send,
135{
136 py.allow_threads(|| runtime().block_on(fut))
137}
138
139#[pyclass(name = "CpuZstd", module = "s4_codec")]
142struct PyCpuZstd {
143 inner: cpu_zstd::CpuZstd,
144}
145
146#[pymethods]
147impl PyCpuZstd {
148 #[new]
149 #[pyo3(signature = (level = 3))]
150 fn new(level: i32) -> Self {
151 Self {
152 inner: cpu_zstd::CpuZstd::new(level),
153 }
154 }
155
156 fn compress<'py>(
160 &self,
161 py: Python<'py>,
162 data: &Bound<'py, PyBytes>,
163 ) -> PyResult<(Bound<'py, PyBytes>, u64, u32)> {
164 let input = Bytes::copy_from_slice(data.as_bytes());
165 let codec = self.inner.clone();
166 let (out, manifest) =
167 block_on(py, async move { codec.compress(input).await }).map_err(codec_err_to_py)?;
168 Ok((
169 PyBytes::new(py, &out),
170 manifest.original_size,
171 manifest.crc32c,
172 ))
173 }
174
175 fn decompress<'py>(
178 &self,
179 py: Python<'py>,
180 data: &Bound<'py, PyBytes>,
181 original_size: u64,
182 crc32c: u32,
183 ) -> PyResult<Bound<'py, PyBytes>> {
184 let input = Bytes::copy_from_slice(data.as_bytes());
185 let manifest = manifest_from_parts(
186 CodecKind::CpuZstd,
187 input.len() as u64,
188 original_size,
189 crc32c,
190 );
191 let codec = self.inner.clone();
192 let out = block_on(py, async move { codec.decompress(input, &manifest).await })
193 .map_err(codec_err_to_py)?;
194 Ok(PyBytes::new(py, &out))
195 }
196
197 fn __repr__(&self) -> String {
198 format!("CpuZstd(level={})", cpu_zstd::CpuZstd::DEFAULT_LEVEL)
199 }
200}
201
202#[pyclass(name = "CpuGzip", module = "s4_codec")]
206struct PyCpuGzip {
207 inner: cpu_gzip::CpuGzip,
208}
209
210#[pymethods]
211impl PyCpuGzip {
212 #[new]
213 #[pyo3(signature = (level = 6))]
214 fn new(level: u32) -> Self {
215 Self {
216 inner: cpu_gzip::CpuGzip::new(level),
217 }
218 }
219
220 fn compress<'py>(
221 &self,
222 py: Python<'py>,
223 data: &Bound<'py, PyBytes>,
224 ) -> PyResult<(Bound<'py, PyBytes>, u64, u32)> {
225 let input = Bytes::copy_from_slice(data.as_bytes());
226 let codec = self.inner.clone();
227 let (out, manifest) =
228 block_on(py, async move { codec.compress(input).await }).map_err(codec_err_to_py)?;
229 Ok((
230 PyBytes::new(py, &out),
231 manifest.original_size,
232 manifest.crc32c,
233 ))
234 }
235
236 fn decompress<'py>(
237 &self,
238 py: Python<'py>,
239 data: &Bound<'py, PyBytes>,
240 original_size: u64,
241 crc32c: u32,
242 ) -> PyResult<Bound<'py, PyBytes>> {
243 let input = Bytes::copy_from_slice(data.as_bytes());
244 let manifest = manifest_from_parts(
245 CodecKind::CpuGzip,
246 input.len() as u64,
247 original_size,
248 crc32c,
249 );
250 let codec = self.inner.clone();
251 let out = block_on(py, async move { codec.decompress(input, &manifest).await })
252 .map_err(codec_err_to_py)?;
253 Ok(PyBytes::new(py, &out))
254 }
255
256 fn __repr__(&self) -> String {
257 format!("CpuGzip(level={})", cpu_gzip::CpuGzip::DEFAULT_LEVEL)
258 }
259}
260
261#[pyfunction]
264fn gpu_available() -> bool {
265 s4_codec_rs::nvcomp::is_gpu_available()
266}
267
268#[pymodule]
269fn s4_codec(py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
270 m.add_class::<PyCpuZstd>()?;
271 m.add_class::<PyCpuGzip>()?;
272 m.add_function(wrap_pyfunction!(gpu_available, m)?)?;
273 m.add("__version__", env!("CARGO_PKG_VERSION"))?;
274 m.add("S4Error", py.get_type::<S4Error>())?;
278 m.add("S4CrcMismatchError", py.get_type::<S4CrcMismatchError>())?;
279 m.add("S4SizeMismatchError", py.get_type::<S4SizeMismatchError>())?;
280 m.add(
281 "S4CodecMismatchError",
282 py.get_type::<S4CodecMismatchError>(),
283 )?;
284 m.add(
285 "S4UnregisteredCodecError",
286 py.get_type::<S4UnregisteredCodecError>(),
287 )?;
288 m.add(
289 "S4ManifestSizeExceedsLimitError",
290 py.get_type::<S4ManifestSizeExceedsLimitError>(),
291 )?;
292 m.add(
293 "S4ManifestSizeMismatchError",
294 py.get_type::<S4ManifestSizeMismatchError>(),
295 )?;
296 m.add("S4BackendError", py.get_type::<S4BackendError>())?;
297 m.add("S4IoError", py.get_type::<S4IoError>())?;
298 Ok(())
299}