prestige 0.3.1

Prestige file reading and writing utilities and tools
Documentation
use std::path::{Path, PathBuf};
use thiserror::Error;

pub type Result<T = ()> = std::result::Result<T, Error>;

#[derive(Debug, Error)]
pub enum Error {
    #[error("aws error: {0}")]
    Aws(#[from] AwsError),

    #[error("prestige configuration error: {0}")]
    Config(#[from] config::ConfigError),

    #[error("parquet error: {0}")]
    Parquet(#[from] parquet::errors::ParquetError),

    #[error("arrow error: {0}")]
    Arrow(#[from] arrow::error::ArrowError),

    #[error("file meta error: {0}")]
    FileMeta(#[from] FileMetaError),

    #[error("channel error: {0}")]
    Channel(#[from] ChannelError),

    #[error("IO error: {0}")]
    Io(#[from] std::io::Error),

    #[error("serde arrow error: {0}")]
    SerdeArrow(String),

    #[cfg(feature = "iceberg")]
    #[error("iceberg error: {0}")]
    Iceberg(#[from] iceberg::Error),

    #[cfg(feature = "iceberg")]
    #[error("catalog http error: {0}")]
    CatalogHttp(String),

    #[cfg(feature = "iceberg")]
    #[error("branch error: {0}")]
    Branch(String),

    #[cfg(feature = "sqlx")]
    #[error("db error: {0}")]
    Db(#[from] sqlx::Error),

    #[error("compaction error: {0}")]
    Compaction(#[from] CompactionError),

    #[error("internal error: {0}")]
    Internal(String),
}

#[derive(thiserror::Error, Debug)]
pub enum AwsError {
    #[error("s3: {0}")]
    S3(Box<aws_sdk_s3::Error>),

    #[error("streaming: {0}")]
    Streaming(#[from] aws_sdk_s3::primitives::ByteStreamError),
}

impl AwsError {
    pub fn s3_error<T>(err: T) -> Error
    where
        T: Into<aws_sdk_s3::Error>,
    {
        Error::from(err.into())
    }
}

#[derive(thiserror::Error, Debug)]
pub enum FileMetaError {
    #[error("invalid timestamp: {0}")]
    InvalidTimestamp(i64),

    #[error("invalid timestamp string: {0}")]
    TimestampStr(#[from] std::num::ParseIntError),

    #[error("filename did not match regex: {0}")]
    Regex(String),

    #[error("no file name found")]
    MissingFilename,

    #[error("IO: {0}")]
    Io(#[from] std::io::Error),
}

#[derive(Error, Debug)]
pub enum ChannelError {
    #[error("failed to send {prefix} for process {process}")]
    PollerSendError { prefix: String, process: String },

    #[error("channel closed sink {name}")]
    SinkClosed { name: String },

    #[error("timeout for sink {name}")]
    SinkTimeout { name: String },

    #[error("channel closed for upload {path}")]
    UploadClosed { path: PathBuf },
}

impl ChannelError {
    pub fn poller_send_error(prefix: &str, process: &str) -> Error {
        Error::Channel(Self::PollerSendError {
            prefix: prefix.to_owned(),
            process: process.to_owned(),
        })
    }

    pub fn sink_closed(name: &str) -> Error {
        Error::Channel(Self::SinkClosed {
            name: name.to_owned(),
        })
    }

    pub fn sink_timeout(name: &str) -> Error {
        Error::Channel(Self::SinkTimeout {
            name: name.to_owned(),
        })
    }

    pub fn upload_closed(path: &Path) -> Error {
        Error::Channel(Self::UploadClosed {
            path: path.to_owned(),
        })
    }
}

#[derive(Error, Debug)]
pub enum CompactionError {
    #[error("upload failed for {file_key}")]
    UploadFailed { file_key: String },

    #[error("no source files provided for compaction")]
    NoSourceFiles,
}

impl From<aws_sdk_s3::Error> for Error {
    fn from(err: aws_sdk_s3::Error) -> Self {
        Self::Aws(AwsError::S3(Box::new(err)))
    }
}

impl From<aws_sdk_s3::primitives::ByteStreamError> for Error {
    fn from(err: aws_sdk_s3::primitives::ByteStreamError) -> Self {
        Self::from(AwsError::Streaming(err))
    }
}