vuke 0.9.0

Research tool for studying vulnerable Bitcoin key generation practices
Documentation
use std::collections::HashMap;

use iceberg::spec::{DataContentType, DataFileBuilder, DataFileFormat, Literal, Struct};
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
use iceberg_catalog_rest::RestCatalogBuilder;

use super::error::IcebergError;
use super::partition::build_partition_spec;
use super::schema::build_iceberg_schema;
use super::{FileMetadata, IcebergConfig, SnapshotInfo};
use crate::storage::CloudCredentials;

pub struct RestCatalogClient {
    config: IcebergConfig,
    credentials: CloudCredentials,
}

impl RestCatalogClient {
    pub fn new(config: IcebergConfig, credentials: CloudCredentials) -> Self {
        Self {
            config,
            credentials,
        }
    }

    pub async fn register_parquet_files(
        &self,
        files: &[FileMetadata],
    ) -> Result<SnapshotInfo, IcebergError> {
        if files.is_empty() {
            return Ok(SnapshotInfo {
                snapshot_id: 0,
                files_registered: 0,
            });
        }

        let catalog = self.build_catalog().await?;
        let namespace = NamespaceIdent::from_strs([&self.config.namespace])
            .map_err(|e| IcebergError::InvalidConfig(format!("invalid namespace: {e}")))?;

        self.ensure_namespace_exists(&catalog, &namespace).await?;

        let table_ident = TableIdent::new(namespace.clone(), self.config.table_name.clone());

        let table = match catalog.table_exists(&table_ident).await {
            Ok(true) => catalog
                .load_table(&table_ident)
                .await
                .map_err(|e| IcebergError::CatalogConnection(format!("load table: {e}")))?,
            Ok(false) => self.create_table(&catalog, &namespace).await?,
            Err(e) => return Err(IcebergError::CatalogConnection(format!("check table: {e}"))),
        };

        let partition_spec_id = table.metadata().default_partition_spec_id();

        let data_files: Vec<_> = files
            .iter()
            .map(|file| {
                let partition = match &file.partition_values {
                    Some(pv) => Struct::from_iter([
                        Some(Literal::string(&pv.transform)),
                        Some(Literal::int(pv.timestamp_day)),
                    ]),
                    None => Struct::from_iter([None, None]),
                };

                DataFileBuilder::default()
                    .content(DataContentType::Data)
                    .file_path(file.uri.clone())
                    .file_format(DataFileFormat::Parquet)
                    .file_size_in_bytes(file.file_size)
                    .record_count(file.record_count)
                    .partition_spec_id(partition_spec_id)
                    .partition(partition)
                    .build()
            })
            .collect::<std::result::Result<Vec<_>, _>>()
            .map_err(|e| IcebergError::SnapshotCommit(format!("build data file: {e}")))?;

        let tx = Transaction::new(&table);
        let append_action = tx.fast_append().add_data_files(data_files);
        let tx = append_action
            .apply(tx)
            .map_err(|e| IcebergError::SnapshotCommit(format!("apply append: {e}")))?;

        let updated_table = tx
            .commit(&catalog)
            .await
            .map_err(|e| IcebergError::SnapshotCommit(format!("commit transaction: {e}")))?;

        let snapshot_id = updated_table
            .metadata()
            .current_snapshot_id()
            .unwrap_or_default();

        Ok(SnapshotInfo {
            snapshot_id,
            files_registered: files.len(),
        })
    }

    async fn build_catalog(&self) -> Result<impl Catalog, IcebergError> {
        let mut props = HashMap::new();
        props.insert("uri".to_string(), self.config.catalog_url.clone());
        props.insert(
            "s3.access-key-id".to_string(),
            self.credentials.access_key_id.clone(),
        );
        props.insert(
            "s3.secret-access-key".to_string(),
            self.credentials.secret_access_key.clone(),
        );

        RestCatalogBuilder::default()
            .load("rest", props)
            .await
            .map_err(|e| IcebergError::CatalogConnection(format!("build catalog: {e}")))
    }

    async fn ensure_namespace_exists(
        &self,
        catalog: &impl Catalog,
        namespace: &NamespaceIdent,
    ) -> Result<(), IcebergError> {
        match catalog.namespace_exists(namespace).await {
            Ok(true) => Ok(()),
            Ok(false) => {
                catalog
                    .create_namespace(namespace, HashMap::new())
                    .await
                    .map_err(|e| {
                        IcebergError::CatalogConnection(format!("create namespace: {e}"))
                    })?;
                Ok(())
            }
            Err(e) => Err(IcebergError::CatalogConnection(format!(
                "check namespace: {e}"
            ))),
        }
    }

    async fn create_table(
        &self,
        catalog: &impl Catalog,
        namespace: &NamespaceIdent,
    ) -> Result<iceberg::table::Table, IcebergError> {
        let schema = build_iceberg_schema()?;
        let partition_spec = build_partition_spec()?;

        let creation = TableCreation::builder()
            .name(self.config.table_name.clone())
            .schema(schema)
            .partition_spec(partition_spec)
            .build();

        catalog
            .create_table(namespace, creation)
            .await
            .map_err(|e| IcebergError::CatalogConnection(format!("create table: {e}")))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn client_new_stores_config() {
        let config = IcebergConfig::new("http://localhost:8181");
        let credentials = CloudCredentials::new("test_key", "test_secret");

        let client = RestCatalogClient::new(config.clone(), credentials);

        assert_eq!(client.config.catalog_url, "http://localhost:8181");
        assert_eq!(client.config.namespace, "vuke");
    }

    #[test]
    fn client_with_custom_namespace() {
        let config = IcebergConfig::new("http://localhost:8181").with_namespace("custom");
        let credentials = CloudCredentials::new("test_key", "test_secret");

        let client = RestCatalogClient::new(config, credentials);

        assert_eq!(client.config.namespace, "custom");
    }

    #[test]
    fn file_metadata_creation() {
        let meta = FileMetadata {
            uri: "s3://bucket/path/file.parquet".to_string(),
            file_size: 1024,
            record_count: 100,
            partition_values: None,
        };

        assert_eq!(meta.uri, "s3://bucket/path/file.parquet");
        assert_eq!(meta.file_size, 1024);
        assert_eq!(meta.record_count, 100);
    }
}