floe-core 0.3.9

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::collections::HashMap;
use std::path::Path;

use crate::checks::normalize::rename_output_columns;
use crate::errors::StorageError;
use crate::io::read::parquet::read_parquet_lazy;
use crate::io::storage::Target;
use crate::io::write::{parts, strategy};
use crate::{check, config, io, FloeResult};

use super::FormatSeeder;

pub(super) struct ParquetSeeder<'a> {
    pub(super) target: &'a Target,
    pub(super) temp_dir: Option<&'a Path>,
    pub(super) cloud: &'a mut io::storage::CloudClient,
    pub(super) resolver: &'a config::StorageResolver,
    pub(super) entity: &'a config::EntityConfig,
}

impl FormatSeeder for ParquetSeeder<'_> {
    fn seed(
        &mut self,
        unique_tracker: &mut check::UniqueTracker,
        scan_cols: &[String],
        rename_back: &HashMap<String, String>,
    ) -> FloeResult<()> {
        match self.target {
            Target::Local { base_path, .. } => {
                let base_path = Path::new(base_path);
                let part_files = parts::list_local_part_paths(base_path, "parquet")?;
                for part_path in part_files {
                    seed_from_parquet_path(unique_tracker, &part_path, scan_cols, rename_back)?;
                }
            }
            Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => {
                let temp_dir = self.temp_dir.ok_or_else(|| {
                    Box::new(StorageError(format!(
                        "entity.name={} missing temp dir for parquet read",
                        self.entity.name
                    )))
                })?;
                let spec = strategy::accepted_parquet_spec();
                let (list_prefix, objects) = {
                    let mut ctx = strategy::WriteContext {
                        target: self.target,
                        cloud: self.cloud,
                        resolver: self.resolver,
                        entity: self.entity,
                    };
                    strategy::list_part_objects(&mut ctx, spec)?
                };
                let client =
                    self.cloud
                        .client_for(self.resolver, self.target.storage(), self.entity)?;
                for object in objects
                    .into_iter()
                    .filter(|obj| obj.key.starts_with(&list_prefix))
                    .filter(|obj| parts::is_part_key(&obj.key, spec.extension))
                {
                    let local_path = client.download_to_temp(&object.uri, temp_dir)?;
                    seed_from_parquet_path(unique_tracker, &local_path, scan_cols, rename_back)?;
                }
            }
        }
        Ok(())
    }
}

fn seed_from_parquet_path(
    unique_tracker: &mut check::UniqueTracker,
    path: &Path,
    scan_cols: &[String],
    rename_back: &HashMap<String, String>,
) -> FloeResult<()> {
    let mut df = read_parquet_lazy(path, Some(scan_cols))?;
    rename_output_columns(&mut df, rename_back)?;
    unique_tracker.seed_from_df(&df)?;
    Ok(())
}