Skip to main content

floe_core/state/
mod.rs

1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use serde::{Deserialize, Serialize};
6use tempfile::NamedTempFile;
7
8use crate::config::{
9    ConfigBase, EntityConfig, IncrementalMode, ResolvedPath, RootConfig, StorageResolver,
10};
11use crate::io::storage::extensions;
12use crate::{ConfigError, FloeResult};
13
14pub const ENTITY_STATE_SCHEMA_V1: &str = "floe.state.file-ingest.v1";
15pub const ENTITY_STATE_FILENAME: &str = "state.json";
16
17#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
18pub struct EntityState {
19    pub schema: String,
20    pub entity: String,
21    pub updated_at: Option<String>,
22    #[serde(default)]
23    pub files: BTreeMap<String, EntityFileState>,
24}
25
26impl EntityState {
27    pub fn new(entity: impl Into<String>) -> Self {
28        Self {
29            schema: ENTITY_STATE_SCHEMA_V1.to_string(),
30            entity: entity.into(),
31            updated_at: None,
32            files: BTreeMap::new(),
33        }
34    }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38pub struct EntityFileState {
39    pub processed_at: String,
40    pub size: Option<u64>,
41    pub mtime: Option<String>,
42}
43
44#[derive(Debug, Clone)]
45pub struct EntityStateInspection {
46    pub entity_name: String,
47    pub incremental_mode: IncrementalMode,
48    pub path: ResolvedPath,
49    pub state: Option<EntityState>,
50}
51
52pub fn resolve_entity_state_path(
53    resolver: &StorageResolver,
54    entity: &EntityConfig,
55) -> FloeResult<ResolvedPath> {
56    if let Some(state) = &entity.state {
57        if let Some(path) = state.path.as_deref() {
58            let resolved = if is_remote_uri(path) {
59                resolver.resolve_path(
60                    &entity.name,
61                    "entity.state.path",
62                    entity.source.storage.as_deref(),
63                    path,
64                )?
65            } else {
66                resolver.resolve_local_path(path)?
67            };
68            return Ok(with_local_state_fallback(resolver, entity, resolved));
69        }
70    }
71
72    let resolved_source = resolver.resolve_path(
73        &entity.name,
74        "entity.source.path",
75        entity.source.storage.as_deref(),
76        &entity.source.path,
77    )?;
78    let source_root = derive_source_root(
79        &entity.source.path,
80        &entity.source.format,
81        resolved_source.local_path.as_deref(),
82    );
83    let default_path = join_state_path(&source_root, &entity.name);
84    let resolved = resolver.resolve_path(
85        &entity.name,
86        "entity.state.path",
87        entity.source.storage.as_deref(),
88        &default_path,
89    )?;
90    Ok(with_local_state_fallback(resolver, entity, resolved))
91}
92
93fn with_local_state_fallback(
94    resolver: &StorageResolver,
95    entity: &EntityConfig,
96    mut resolved: ResolvedPath,
97) -> ResolvedPath {
98    if resolved.local_path.is_none() {
99        resolved.local_path = Some(default_local_state_cache_path(
100            resolver,
101            entity,
102            &resolved.uri,
103        ));
104    }
105    resolved
106}
107
108fn default_local_state_cache_path(
109    resolver: &StorageResolver,
110    entity: &EntityConfig,
111    resolved_uri: &str,
112) -> PathBuf {
113    if resolver.config_is_remote() {
114        remote_config_state_cache_root()
115            .join(short_stable_hash_hex(resolved_uri))
116            .join(&entity.name)
117            .join(ENTITY_STATE_FILENAME)
118    } else {
119        resolver
120            .config_local_dir()
121            .join(".floe/state")
122            .join(&entity.name)
123            .join(ENTITY_STATE_FILENAME)
124    }
125}
126
127fn remote_config_state_cache_root() -> PathBuf {
128    if let Some(path) = std::env::var_os("XDG_CACHE_HOME") {
129        let path = PathBuf::from(path);
130        if path.is_absolute() {
131            return path.join("floe/state");
132        }
133    }
134    if let Some(home) = std::env::var_os("HOME") {
135        return PathBuf::from(home).join(".cache/floe/state");
136    }
137    std::env::current_dir()
138        .unwrap_or_else(|_| PathBuf::from("."))
139        .join(".floe/state")
140}
141
142fn short_stable_hash_hex(value: &str) -> String {
143    let mut hash: u64 = 0xcbf29ce484222325;
144    for byte in value.as_bytes() {
145        hash ^= u64::from(*byte);
146        hash = hash.wrapping_mul(0x100000001b3);
147    }
148    format!("{:016x}", hash)
149}
150
151pub fn read_entity_state(path: &Path) -> FloeResult<Option<EntityState>> {
152    if !path.exists() {
153        return Ok(None);
154    }
155    let payload = fs::read_to_string(path)?;
156    let state: EntityState = serde_json::from_str(&payload)?;
157    Ok(Some(state))
158}
159
160pub fn inspect_entity_state_with_base(
161    config_path: &Path,
162    config_base: ConfigBase,
163    entity_name: &str,
164) -> FloeResult<EntityStateInspection> {
165    let config = crate::load_config(config_path)?;
166    inspect_entity_state(&config, config_base, entity_name)
167}
168
169pub fn inspect_entity_state(
170    config: &RootConfig,
171    config_base: ConfigBase,
172    entity_name: &str,
173) -> FloeResult<EntityStateInspection> {
174    let (entity, path) = resolve_entity_state_target(config, config_base, entity_name)?;
175    let state = match &path.local_path {
176        Some(local_path) => read_entity_state(local_path)?
177            .map(|state| validate_entity_state(entity, state))
178            .transpose()?,
179        None => None,
180    };
181
182    Ok(EntityStateInspection {
183        entity_name: entity.name.clone(),
184        incremental_mode: entity.incremental_mode,
185        path,
186        state,
187    })
188}
189
190pub fn reset_entity_state_with_base(
191    config_path: &Path,
192    config_base: ConfigBase,
193    entity_name: &str,
194) -> FloeResult<bool> {
195    let config = crate::load_config(config_path)?;
196    let (entity, path) = resolve_entity_state_target(&config, config_base, entity_name)?;
197    let Some(local_path) = path.local_path.as_ref() else {
198        return Err(Box::new(ConfigError(format!(
199            "entity.name={} state path is not local and cannot be reset: {}",
200            entity.name, path.uri
201        ))));
202    };
203
204    if !local_path.exists() {
205        return Ok(false);
206    }
207
208    fs::remove_file(local_path)?;
209    Ok(true)
210}
211
212fn resolve_entity_state_target<'a>(
213    config: &'a RootConfig,
214    config_base: ConfigBase,
215    entity_name: &str,
216) -> FloeResult<(&'a EntityConfig, ResolvedPath)> {
217    let entity = config
218        .entities
219        .iter()
220        .find(|entity| entity.name == entity_name)
221        .ok_or_else(|| {
222            Box::new(ConfigError(format!("entity not found: {entity_name}")))
223                as Box<dyn std::error::Error + Send + Sync>
224        })?;
225    let resolver = StorageResolver::new(config, config_base)?;
226    let path = resolve_entity_state_path(&resolver, entity)?;
227    Ok((entity, path))
228}
229
230pub fn write_entity_state_atomic(path: &Path, state: &EntityState) -> FloeResult<()> {
231    let parent = path.parent().ok_or_else(|| {
232        Box::new(ConfigError(format!(
233            "state path has no parent directory: {}",
234            path.display()
235        ))) as Box<dyn std::error::Error + Send + Sync>
236    })?;
237    fs::create_dir_all(parent)?;
238
239    let mut temp = NamedTempFile::new_in(parent)?;
240    serde_json::to_writer_pretty(temp.as_file_mut(), state)?;
241    temp.as_file_mut().sync_all()?;
242    temp.persist(path).map_err(|err| err.error)?;
243    Ok(())
244}
245
246fn join_state_path(source_root: &str, entity_name: &str) -> String {
247    if source_root.is_empty() || source_root == "." {
248        format!(".floe/state/{entity_name}/{ENTITY_STATE_FILENAME}")
249    } else {
250        format!(
251            "{}/.floe/state/{entity_name}/{ENTITY_STATE_FILENAME}",
252            source_root.trim_end_matches(is_path_separator)
253        )
254    }
255}
256
257fn derive_source_root(
258    raw_path: &str,
259    source_format: &str,
260    resolved_local_path: Option<&Path>,
261) -> String {
262    let trimmed = raw_path.trim_end_matches(is_path_separator);
263    if trimmed.is_empty() {
264        return String::new();
265    }
266
267    if let Some(prefix) = prefix_before_first_glob(trimmed) {
268        if prefix.is_empty() {
269            return String::new();
270        }
271        if prefix.ends_with(is_path_separator) {
272            return prefix.trim_end_matches(is_path_separator).to_string();
273        }
274        return parent_like(prefix)
275            .unwrap_or(prefix)
276            .trim_end_matches(is_path_separator)
277            .to_string();
278    }
279
280    if let Some(path) = resolved_local_path.filter(|path| path.exists()) {
281        if path.is_dir() {
282            return trimmed.to_string();
283        }
284        if path.is_file() {
285            return parent_like(trimmed)
286                .unwrap_or(trimmed)
287                .trim_end_matches(is_path_separator)
288                .to_string();
289        }
290    }
291
292    if matches_source_file_suffix(trimmed, source_format) {
293        return parent_like(trimmed)
294            .unwrap_or(trimmed)
295            .trim_end_matches(is_path_separator)
296            .to_string();
297    }
298
299    trimmed.to_string()
300}
301
302fn prefix_before_first_glob(value: &str) -> Option<&str> {
303    let index = value.find(['*', '?', '['])?;
304    Some(&value[..index])
305}
306
307fn matches_source_file_suffix(value: &str, source_format: &str) -> bool {
308    let Some(segment) = value.rsplit(is_path_separator).next() else {
309        return false;
310    };
311    let segment = segment.to_ascii_lowercase();
312
313    extensions::suffixes_for_format(source_format)
314        .map(|suffixes| {
315            suffixes
316                .iter()
317                .any(|suffix| segment.ends_with(&suffix.to_ascii_lowercase()))
318        })
319        .unwrap_or(false)
320}
321
322fn parent_like(value: &str) -> Option<&str> {
323    value.rfind(is_path_separator).map(|index| {
324        if index == 0 {
325            &value[..1]
326        } else {
327            &value[..index]
328        }
329    })
330}
331
332fn is_path_separator(ch: char) -> bool {
333    ch == '/' || ch == '\\'
334}
335
336fn is_remote_uri(value: &str) -> bool {
337    value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
338}
339
340pub fn validate_entity_state(entity: &EntityConfig, state: EntityState) -> FloeResult<EntityState> {
341    if state.schema != ENTITY_STATE_SCHEMA_V1 {
342        return Err(Box::new(ConfigError(format!(
343            "entity.name={} state schema mismatch: expected {}, got {}",
344            entity.name, ENTITY_STATE_SCHEMA_V1, state.schema
345        ))));
346    }
347
348    if state.entity != entity.name {
349        return Err(Box::new(ConfigError(format!(
350            "entity.name={} state entity mismatch: expected {}, got {}",
351            entity.name, entity.name, state.entity
352        ))));
353    }
354
355    Ok(state)
356}