floe-core 0.3.7

Core library for Floe, a YAML-driven technical ingestion tool.
Documentation
use std::collections::BTreeMap;
use std::fs;
use std::path::{Path, PathBuf};

use serde::{Deserialize, Serialize};
use tempfile::NamedTempFile;

use crate::config::{
    ConfigBase, EntityConfig, IncrementalMode, ResolvedPath, RootConfig, StorageResolver,
};
use crate::io::storage::extensions;
use crate::{ConfigError, FloeResult};

pub const ENTITY_STATE_SCHEMA_V1: &str = "floe.state.file-ingest.v1";
pub const ENTITY_STATE_FILENAME: &str = "state.json";

#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct EntityState {
    pub schema: String,
    pub entity: String,
    pub updated_at: Option<String>,
    #[serde(default)]
    pub files: BTreeMap<String, EntityFileState>,
}

impl EntityState {
    pub fn new(entity: impl Into<String>) -> Self {
        Self {
            schema: ENTITY_STATE_SCHEMA_V1.to_string(),
            entity: entity.into(),
            updated_at: None,
            files: BTreeMap::new(),
        }
    }
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EntityFileState {
    pub processed_at: String,
    pub size: Option<u64>,
    pub mtime: Option<String>,
}

#[derive(Debug, Clone)]
pub struct EntityStateInspection {
    pub entity_name: String,
    pub incremental_mode: IncrementalMode,
    pub path: ResolvedPath,
    pub state: Option<EntityState>,
}

pub fn resolve_entity_state_path(
    resolver: &StorageResolver,
    entity: &EntityConfig,
) -> FloeResult<ResolvedPath> {
    if let Some(state) = &entity.state {
        if let Some(path) = state.path.as_deref() {
            let resolved = if is_remote_uri(path) {
                resolver.resolve_path(
                    &entity.name,
                    "entity.state.path",
                    entity.source.storage.as_deref(),
                    path,
                )?
            } else {
                resolver.resolve_local_path(path)?
            };
            return Ok(with_local_state_fallback(resolver, entity, resolved));
        }
    }

    let resolved_source = resolver.resolve_path(
        &entity.name,
        "entity.source.path",
        entity.source.storage.as_deref(),
        &entity.source.path,
    )?;
    let source_root = derive_source_root(
        &entity.source.path,
        &entity.source.format,
        resolved_source.local_path.as_deref(),
    );
    let default_path = join_state_path(&source_root, &entity.name);
    let resolved = resolver.resolve_path(
        &entity.name,
        "entity.state.path",
        entity.source.storage.as_deref(),
        &default_path,
    )?;
    Ok(with_local_state_fallback(resolver, entity, resolved))
}

fn with_local_state_fallback(
    resolver: &StorageResolver,
    entity: &EntityConfig,
    mut resolved: ResolvedPath,
) -> ResolvedPath {
    if resolved.local_path.is_none() {
        resolved.local_path = Some(default_local_state_cache_path(
            resolver,
            entity,
            &resolved.uri,
        ));
    }
    resolved
}

fn default_local_state_cache_path(
    resolver: &StorageResolver,
    entity: &EntityConfig,
    resolved_uri: &str,
) -> PathBuf {
    if resolver.config_is_remote() {
        remote_config_state_cache_root()
            .join(short_stable_hash_hex(resolved_uri))
            .join(&entity.name)
            .join(ENTITY_STATE_FILENAME)
    } else {
        resolver
            .config_local_dir()
            .join(".floe/state")
            .join(&entity.name)
            .join(ENTITY_STATE_FILENAME)
    }
}

fn remote_config_state_cache_root() -> PathBuf {
    if let Some(path) = std::env::var_os("XDG_CACHE_HOME") {
        let path = PathBuf::from(path);
        if path.is_absolute() {
            return path.join("floe/state");
        }
    }
    if let Some(home) = std::env::var_os("HOME") {
        return PathBuf::from(home).join(".cache/floe/state");
    }
    std::env::current_dir()
        .unwrap_or_else(|_| PathBuf::from("."))
        .join(".floe/state")
}

fn short_stable_hash_hex(value: &str) -> String {
    let mut hash: u64 = 0xcbf29ce484222325;
    for byte in value.as_bytes() {
        hash ^= u64::from(*byte);
        hash = hash.wrapping_mul(0x100000001b3);
    }
    format!("{:016x}", hash)
}

pub fn read_entity_state(path: &Path) -> FloeResult<Option<EntityState>> {
    if !path.exists() {
        return Ok(None);
    }
    let payload = fs::read_to_string(path)?;
    let state: EntityState = serde_json::from_str(&payload)?;
    Ok(Some(state))
}

pub fn inspect_entity_state_with_base(
    config_path: &Path,
    config_base: ConfigBase,
    entity_name: &str,
) -> FloeResult<EntityStateInspection> {
    let config = crate::load_config(config_path)?;
    inspect_entity_state(&config, config_base, entity_name)
}

pub fn inspect_entity_state(
    config: &RootConfig,
    config_base: ConfigBase,
    entity_name: &str,
) -> FloeResult<EntityStateInspection> {
    let (entity, path) = resolve_entity_state_target(config, config_base, entity_name)?;
    let state = match &path.local_path {
        Some(local_path) => read_entity_state(local_path)?
            .map(|state| validate_entity_state(entity, state))
            .transpose()?,
        None => None,
    };

    Ok(EntityStateInspection {
        entity_name: entity.name.clone(),
        incremental_mode: entity.incremental_mode,
        path,
        state,
    })
}

pub fn reset_entity_state_with_base(
    config_path: &Path,
    config_base: ConfigBase,
    entity_name: &str,
) -> FloeResult<bool> {
    let config = crate::load_config(config_path)?;
    let (entity, path) = resolve_entity_state_target(&config, config_base, entity_name)?;
    let Some(local_path) = path.local_path.as_ref() else {
        return Err(Box::new(ConfigError(format!(
            "entity.name={} state path is not local and cannot be reset: {}",
            entity.name, path.uri
        ))));
    };

    if !local_path.exists() {
        return Ok(false);
    }

    fs::remove_file(local_path)?;
    Ok(true)
}

fn resolve_entity_state_target<'a>(
    config: &'a RootConfig,
    config_base: ConfigBase,
    entity_name: &str,
) -> FloeResult<(&'a EntityConfig, ResolvedPath)> {
    let entity = config
        .entities
        .iter()
        .find(|entity| entity.name == entity_name)
        .ok_or_else(|| {
            Box::new(ConfigError(format!("entity not found: {entity_name}")))
                as Box<dyn std::error::Error + Send + Sync>
        })?;
    let resolver = StorageResolver::new(config, config_base)?;
    let path = resolve_entity_state_path(&resolver, entity)?;
    Ok((entity, path))
}

pub fn write_entity_state_atomic(path: &Path, state: &EntityState) -> FloeResult<()> {
    let parent = path.parent().ok_or_else(|| {
        Box::new(ConfigError(format!(
            "state path has no parent directory: {}",
            path.display()
        ))) as Box<dyn std::error::Error + Send + Sync>
    })?;
    fs::create_dir_all(parent)?;

    let mut temp = NamedTempFile::new_in(parent)?;
    serde_json::to_writer_pretty(temp.as_file_mut(), state)?;
    temp.as_file_mut().sync_all()?;
    temp.persist(path).map_err(|err| err.error)?;
    Ok(())
}

fn join_state_path(source_root: &str, entity_name: &str) -> String {
    if source_root.is_empty() || source_root == "." {
        format!(".floe/state/{entity_name}/{ENTITY_STATE_FILENAME}")
    } else {
        format!(
            "{}/.floe/state/{entity_name}/{ENTITY_STATE_FILENAME}",
            source_root.trim_end_matches(is_path_separator)
        )
    }
}

fn derive_source_root(
    raw_path: &str,
    source_format: &str,
    resolved_local_path: Option<&Path>,
) -> String {
    let trimmed = raw_path.trim_end_matches(is_path_separator);
    if trimmed.is_empty() {
        return String::new();
    }

    if let Some(prefix) = prefix_before_first_glob(trimmed) {
        if prefix.is_empty() {
            return String::new();
        }
        if prefix.ends_with(is_path_separator) {
            return prefix.trim_end_matches(is_path_separator).to_string();
        }
        return parent_like(prefix)
            .unwrap_or(prefix)
            .trim_end_matches(is_path_separator)
            .to_string();
    }

    if let Some(path) = resolved_local_path.filter(|path| path.exists()) {
        if path.is_dir() {
            return trimmed.to_string();
        }
        if path.is_file() {
            return parent_like(trimmed)
                .unwrap_or(trimmed)
                .trim_end_matches(is_path_separator)
                .to_string();
        }
    }

    if matches_source_file_suffix(trimmed, source_format) {
        return parent_like(trimmed)
            .unwrap_or(trimmed)
            .trim_end_matches(is_path_separator)
            .to_string();
    }

    trimmed.to_string()
}

fn prefix_before_first_glob(value: &str) -> Option<&str> {
    let index = value.find(['*', '?', '['])?;
    Some(&value[..index])
}

fn matches_source_file_suffix(value: &str, source_format: &str) -> bool {
    let Some(segment) = value.rsplit(is_path_separator).next() else {
        return false;
    };
    let segment = segment.to_ascii_lowercase();

    extensions::suffixes_for_format(source_format)
        .map(|suffixes| {
            suffixes
                .iter()
                .any(|suffix| segment.ends_with(&suffix.to_ascii_lowercase()))
        })
        .unwrap_or(false)
}

fn parent_like(value: &str) -> Option<&str> {
    value.rfind(is_path_separator).map(|index| {
        if index == 0 {
            &value[..1]
        } else {
            &value[..index]
        }
    })
}

fn is_path_separator(ch: char) -> bool {
    ch == '/' || ch == '\\'
}

fn is_remote_uri(value: &str) -> bool {
    value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
}

pub fn validate_entity_state(entity: &EntityConfig, state: EntityState) -> FloeResult<EntityState> {
    if state.schema != ENTITY_STATE_SCHEMA_V1 {
        return Err(Box::new(ConfigError(format!(
            "entity.name={} state schema mismatch: expected {}, got {}",
            entity.name, ENTITY_STATE_SCHEMA_V1, state.schema
        ))));
    }

    if state.entity != entity.name {
        return Err(Box::new(ConfigError(format!(
            "entity.name={} state entity mismatch: expected {}, got {}",
            entity.name, entity.name, state.entity
        ))));
    }

    Ok(state)
}