use std::str::FromStr;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use thiserror::Error;
pub mod cpu_gzip;
pub mod cpu_zstd;
pub mod dietgpu;
pub mod dispatcher;
#[cfg(feature = "nvcomp-gpu")]
mod ferro_compress;
#[cfg(feature = "nvcomp-gpu")]
pub mod gpu_select;
pub mod index;
pub mod multipart;
pub mod nvcomp;
pub mod passthrough;
pub mod registry;
pub use registry::CodecRegistry;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum CodecKind {
Passthrough,
NvcompBitcomp,
NvcompGans,
NvcompZstd,
DietGpuAns,
CpuZstd,
NvcompGDeflate,
CpuGzip,
}
impl CodecKind {
pub fn as_str(self) -> &'static str {
match self {
Self::Passthrough => "passthrough",
Self::NvcompBitcomp => "nvcomp-bitcomp",
Self::NvcompGans => "nvcomp-gans",
Self::NvcompZstd => "nvcomp-zstd",
Self::DietGpuAns => "dietgpu-ans",
Self::CpuZstd => "cpu-zstd",
Self::NvcompGDeflate => "nvcomp-gdeflate",
Self::CpuGzip => "cpu-gzip",
}
}
pub fn id(self) -> u32 {
match self {
Self::Passthrough => 0,
Self::CpuZstd => 1,
Self::NvcompZstd => 2,
Self::NvcompBitcomp => 3,
Self::NvcompGans => 4,
Self::DietGpuAns => 5,
Self::NvcompGDeflate => 6,
Self::CpuGzip => 7,
}
}
pub fn from_id(id: u32) -> Option<Self> {
Some(match id {
0 => Self::Passthrough,
1 => Self::CpuZstd,
2 => Self::NvcompZstd,
3 => Self::NvcompBitcomp,
4 => Self::NvcompGans,
5 => Self::DietGpuAns,
6 => Self::NvcompGDeflate,
7 => Self::CpuGzip,
_ => return None,
})
}
}
#[derive(Debug, thiserror::Error)]
#[error("unknown codec kind: {0}")]
pub struct ParseCodecKindError(String);
impl FromStr for CodecKind {
type Err = ParseCodecKindError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"passthrough" => Self::Passthrough,
"nvcomp-bitcomp" => Self::NvcompBitcomp,
"nvcomp-gans" => Self::NvcompGans,
"nvcomp-zstd" => Self::NvcompZstd,
"dietgpu-ans" => Self::DietGpuAns,
"cpu-zstd" => Self::CpuZstd,
"nvcomp-gdeflate" => Self::NvcompGDeflate,
"cpu-gzip" => Self::CpuGzip,
other => return Err(ParseCodecKindError(other.into())),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChunkManifest {
pub codec: CodecKind,
pub original_size: u64,
pub compressed_size: u64,
pub crc32c: u32,
}
#[derive(Debug, Clone, Copy)]
pub struct CompressTelemetry {
pub codec: &'static str,
pub bytes_in: u64,
pub bytes_out: u64,
pub gpu_seconds: Option<f64>,
pub oom: bool,
}
impl CompressTelemetry {
pub fn cpu(codec: &'static str, bytes_in: u64, bytes_out: u64) -> Self {
Self {
codec,
bytes_in,
bytes_out,
gpu_seconds: None,
oom: false,
}
}
pub fn gpu(codec: &'static str, bytes_in: u64, bytes_out: u64, seconds: f64) -> Self {
Self {
codec,
bytes_in,
bytes_out,
gpu_seconds: Some(seconds),
oom: false,
}
}
pub fn with_oom(mut self) -> Self {
self.oom = true;
self
}
}
pub fn looks_like_oom(err: &CodecError) -> bool {
let s = err.to_string().to_ascii_lowercase();
s.contains("out of memory")
|| s.contains("cudaerrormemoryallocation")
|| s.contains("cuda_error_out_of_memory")
}
#[derive(Debug, Error)]
pub enum CodecError {
#[error("codec mismatch: expected {expected:?}, got {got:?}")]
CodecMismatch { expected: CodecKind, got: CodecKind },
#[error("crc32c mismatch (chunk corruption?): expected {expected:#010x}, got {got:#010x}")]
CrcMismatch { expected: u32, got: u32 },
#[error("compressed size mismatch: manifest says {expected} bytes, payload is {got} bytes")]
SizeMismatch { expected: u64, got: u64 },
#[error("compression backend error: {0}")]
Backend(#[from] anyhow::Error),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("blocking-task join error: {0}")]
Join(#[from] tokio::task::JoinError),
#[error("codec {0:?} is not registered in this CodecRegistry")]
UnregisteredCodec(CodecKind),
}
#[async_trait::async_trait]
pub trait Codec: Send + Sync {
fn kind(&self) -> CodecKind;
async fn compress(&self, input: Bytes) -> Result<(Bytes, ChunkManifest), CodecError>;
async fn decompress(&self, input: Bytes, manifest: &ChunkManifest)
-> Result<Bytes, CodecError>;
}
pub use dispatcher::CodecDispatcher;