floe-core 0.3.6

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::fs;
use std::path::{Path, PathBuf};

use crate::io::storage::ObjectRef;
use crate::{io, FloeResult};

pub(super) fn latest_local_metadata_location(table_root: &Path) -> FloeResult<Option<String>> {
    let metadata_dir = table_root.join("metadata");
    if !metadata_dir.exists() {
        return Ok(None);
    }

    let mut best: Option<(i64, PathBuf)> = None;
    for entry in fs::read_dir(&metadata_dir)? {
        let entry = entry?;
        let path = entry.path();
        if !path.is_file() {
            continue;
        }
        let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
            continue;
        };
        if !file_name.ends_with(".metadata.json") {
            continue;
        }
        let Some(version) = parse_metadata_version_from_filename(file_name) else {
            continue;
        };
        let replace = match &best {
            None => true,
            Some((best_version, best_path)) => {
                version > *best_version || (version == *best_version && path > *best_path)
            }
        };
        if replace {
            best = Some((version, path));
        }
    }

    Ok(best.map(|(_, path)| path.display().to_string()))
}

pub(super) fn latest_s3_metadata_location(
    client: &mut dyn io::storage::StorageClient,
    base_key: &str,
) -> FloeResult<Option<String>> {
    let metadata_prefix = if base_key.trim_matches('/').is_empty() {
        "metadata/".to_string()
    } else {
        format!("{}/metadata/", base_key.trim_matches('/'))
    };
    let listed = client.list(&metadata_prefix)?;
    latest_metadata_location_from_objects(listed)
}

pub(super) fn latest_gcs_metadata_location(
    client: &mut dyn io::storage::StorageClient,
    base_key: &str,
) -> FloeResult<Option<String>> {
    let metadata_prefix = if base_key.trim_matches('/').is_empty() {
        "metadata/".to_string()
    } else {
        format!("{}/metadata/", base_key.trim_matches('/'))
    };
    let listed = client.list(&metadata_prefix)?;
    latest_metadata_location_from_objects(listed)
}

fn latest_metadata_location_from_objects(objects: Vec<ObjectRef>) -> FloeResult<Option<String>> {
    let mut best: Option<(i64, String, String)> = None;
    for object in objects {
        let file_name = object
            .key
            .rsplit('/')
            .next()
            .unwrap_or(object.key.as_str())
            .to_string();
        if !file_name.ends_with(".metadata.json") {
            continue;
        }
        let Some(version) = parse_metadata_version_from_filename(&file_name) else {
            continue;
        };
        let replace = match &best {
            None => true,
            Some((best_version, best_key, _)) => {
                version > *best_version || (version == *best_version && object.key > *best_key)
            }
        };
        if replace {
            best = Some((version, object.key.clone(), object.uri.clone()));
        }
    }
    Ok(best.map(|(_, _, uri)| uri))
}

pub(super) fn parse_metadata_version_from_location(location: &str) -> Option<i64> {
    let file_name = Path::new(location).file_name()?.to_str()?;
    parse_metadata_version_from_filename(file_name)
}

pub(super) fn parse_metadata_version_from_filename(file_name: &str) -> Option<i64> {
    let prefix = file_name.split_once('-')?.0;
    prefix.parse::<i64>().ok()
}