floe-core 0.3.7

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::path::{Path, PathBuf};

use glob::glob;

use crate::errors::{RunError, StorageError};
use crate::{config, ConfigError, FloeResult};

use crate::io::storage::{planner, ObjectRef, StorageClient};

pub struct LocalClient;

impl LocalClient {
    pub fn new() -> Self {
        Self
    }
}

impl Default for LocalClient {
    fn default() -> Self {
        Self::new()
    }
}

impl StorageClient for LocalClient {
    fn list(&self, prefix: &str) -> FloeResult<Vec<ObjectRef>> {
        let path = Path::new(prefix);
        if path.is_file() {
            let uri = self.resolve_uri(prefix)?;
            return Ok(vec![ObjectRef {
                uri,
                key: prefix.to_string(),
                last_modified: None,
                size: None,
            }]);
        }
        if !path.exists() {
            return Ok(Vec::new());
        }
        let mut refs = Vec::new();
        for entry in std::fs::read_dir(path)? {
            let entry = entry?;
            let path = entry.path();
            if path.is_file() {
                let key = path.display().to_string();
                let uri = self.resolve_uri(&key)?;
                refs.push(ObjectRef {
                    uri,
                    key,
                    last_modified: None,
                    size: None,
                });
            }
        }
        refs = planner::stable_sort_refs(refs);
        Ok(refs)
    }

    fn download_to_temp(&self, uri: &str, temp_dir: &Path) -> FloeResult<PathBuf> {
        let src = PathBuf::from(uri.trim_start_matches("local://"));
        let dest = temp_dir.join(
            src.file_name()
                .and_then(|name| name.to_str())
                .unwrap_or("object"),
        );
        planner::ensure_parent_dir(&dest)?;
        std::fs::copy(&src, &dest).map_err(|err| {
            Box::new(StorageError(format!(
                "local download failed from {}: {err}",
                src.display()
            ))) as Box<dyn std::error::Error + Send + Sync>
        })?;
        Ok(dest)
    }

    fn upload_from_path(&self, local_path: &Path, uri: &str) -> FloeResult<()> {
        let dest = PathBuf::from(uri.trim_start_matches("local://"));
        planner::ensure_parent_dir(&dest)?;
        std::fs::copy(local_path, &dest).map_err(|err| {
            Box::new(StorageError(format!(
                "local upload failed to {}: {err}",
                dest.display()
            ))) as Box<dyn std::error::Error + Send + Sync>
        })?;
        Ok(())
    }

    fn resolve_uri(&self, path: &str) -> FloeResult<String> {
        let path = Path::new(path);
        let normalized = if path.is_absolute() {
            crate::io::storage::paths::normalize_local_path(path)
        } else {
            let abs = std::env::current_dir()?.join(path);
            crate::io::storage::paths::normalize_local_path(&abs)
        };
        Ok(format!("local://{}", normalized.display()))
    }

    fn copy_object(&self, src_uri: &str, dst_uri: &str) -> FloeResult<()> {
        let src = Path::new(src_uri.trim_start_matches("local://"));
        let dst = Path::new(dst_uri.trim_start_matches("local://"));
        planner::ensure_parent_dir(dst)?;
        std::fs::copy(src, dst).map_err(|err| {
            Box::new(StorageError(format!(
                "local copy failed from {} to {}: {err}",
                src.display(),
                dst.display()
            ))) as Box<dyn std::error::Error + Send + Sync>
        })?;
        Ok(())
    }

    fn delete_object(&self, uri: &str) -> FloeResult<()> {
        let path = Path::new(uri.trim_start_matches("local://"));
        if path.exists() {
            std::fs::remove_file(path).map_err(|err| {
                Box::new(StorageError(format!(
                    "local delete failed for {}: {err}",
                    path.display()
                ))) as Box<dyn std::error::Error + Send + Sync>
            })?;
        }
        Ok(())
    }

    fn exists(&self, uri: &str) -> FloeResult<bool> {
        let path = Path::new(uri.trim_start_matches("local://"));
        Ok(path.exists())
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LocalInputMode {
    File,
    Directory,
}

#[derive(Debug, Clone)]
pub struct ResolvedLocalInputs {
    pub files: Vec<PathBuf>,
    pub mode: LocalInputMode,
}

pub fn resolve_local_inputs(
    config_dir: &Path,
    entity_name: &str,
    source: &config::SourceConfig,
    storage: &str,
    default_globs: &[String],
) -> FloeResult<ResolvedLocalInputs> {
    let default_options = config::SourceOptions::default();
    let options = source.options.as_ref().unwrap_or(&default_options);
    let recursive = options.recursive.unwrap_or(false);
    let glob_override = options.glob.as_deref();
    let raw_path = source.path.as_str();

    if is_glob_pattern(raw_path) {
        let pattern_path = resolve_glob_pattern(config_dir, raw_path);
        let pattern = pattern_path.to_string_lossy().to_string();
        let files = collect_glob_files(&pattern)?;
        if files.is_empty() {
            let (base_path, glob_used) = split_glob_details(&pattern_path, raw_path);
            return Err(Box::new(RunError(no_match_message(
                entity_name,
                storage,
                &base_path,
                &glob_used,
                recursive,
            ))));
        }
        return Ok(ResolvedLocalInputs {
            files,
            mode: LocalInputMode::Directory,
        });
    }

    let base_path = config::resolve_local_path(config_dir, raw_path);
    if base_path.is_file() {
        return Ok(ResolvedLocalInputs {
            files: vec![base_path],
            mode: LocalInputMode::File,
        });
    }

    let glob_used = if let Some(glob_override) = glob_override {
        vec![glob_override.to_string()]
    } else {
        default_globs.to_vec()
    };
    if !base_path.is_dir() {
        return Err(Box::new(RunError(no_match_message(
            entity_name,
            storage,
            &base_path.display().to_string(),
            &glob_used.join(","),
            recursive,
        ))));
    }

    let pattern_paths = if recursive {
        glob_used
            .iter()
            .map(|glob| base_path.join("**").join(glob))
            .collect::<Vec<_>>()
    } else {
        glob_used
            .iter()
            .map(|glob| base_path.join(glob))
            .collect::<Vec<_>>()
    };
    let files = collect_glob_files_multi(&pattern_paths)?;
    if files.is_empty() {
        return Err(Box::new(RunError(no_match_message(
            entity_name,
            storage,
            &base_path.display().to_string(),
            &glob_used.join(","),
            recursive,
        ))));
    }

    Ok(ResolvedLocalInputs {
        files,
        mode: LocalInputMode::Directory,
    })
}

fn is_glob_pattern(value: &str) -> bool {
    value.contains('*') || value.contains('?') || value.contains('[')
}

fn resolve_glob_pattern(config_dir: &Path, raw_path: &str) -> PathBuf {
    let path = Path::new(raw_path);
    if path.is_absolute() {
        path.to_path_buf()
    } else {
        config_dir.join(raw_path)
    }
}

fn split_glob_details(pattern_path: &Path, raw_pattern: &str) -> (String, String) {
    let base = pattern_path
        .parent()
        .unwrap_or(pattern_path)
        .display()
        .to_string();
    let glob_used = pattern_path
        .file_name()
        .map(|name| name.to_string_lossy().to_string())
        .unwrap_or_else(|| raw_pattern.to_string());
    (base, glob_used)
}

fn collect_glob_files(pattern: &str) -> FloeResult<Vec<PathBuf>> {
    let mut files = Vec::new();
    let entries = glob(pattern).map_err(|err| {
        Box::new(ConfigError(format!(
            "invalid glob pattern {pattern:?}: {err}"
        ))) as Box<dyn std::error::Error + Send + Sync>
    })?;
    for entry in entries {
        let path = entry.map_err(|err| {
            Box::new(ConfigError(format!(
                "glob match failed for {pattern:?}: {err}"
            ))) as Box<dyn std::error::Error + Send + Sync>
        })?;
        if path.is_file() {
            files.push(crate::io::storage::paths::normalize_local_path(&path));
        }
    }
    files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
    Ok(files)
}

fn collect_glob_files_multi(patterns: &[PathBuf]) -> FloeResult<Vec<PathBuf>> {
    let mut files = Vec::new();
    for pattern_path in patterns {
        let pattern = pattern_path.to_string_lossy().to_string();
        files.extend(collect_glob_files(&pattern)?);
    }
    files.sort_by(|a, b| a.to_string_lossy().cmp(&b.to_string_lossy()));
    files.dedup_by(|a, b| a.to_string_lossy() == b.to_string_lossy());
    Ok(files)
}

fn no_match_message(
    entity_name: &str,
    storage: &str,
    base_path: &str,
    glob_used: &str,
    recursive: bool,
) -> String {
    format!(
        "entity.name={} source.storage={} no input files matched (base_path={}, glob={}, recursive={})",
        entity_name, storage, base_path, glob_used, recursive
    )
}