1use std::collections::BTreeMap;
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use serde::{Deserialize, Serialize};
6use tempfile::NamedTempFile;
7
8use crate::config::{
9 ConfigBase, EntityConfig, IncrementalMode, ResolvedPath, RootConfig, StorageResolver,
10};
11use crate::io::storage::extensions;
12use crate::{ConfigError, FloeResult};
13
14pub const ENTITY_STATE_SCHEMA_V1: &str = "floe.state.file-ingest.v1";
15pub const ENTITY_STATE_FILENAME: &str = "state.json";
16
17#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
18pub struct EntityState {
19 pub schema: String,
20 pub entity: String,
21 pub updated_at: Option<String>,
22 #[serde(default)]
23 pub files: BTreeMap<String, EntityFileState>,
24}
25
26impl EntityState {
27 pub fn new(entity: impl Into<String>) -> Self {
28 Self {
29 schema: ENTITY_STATE_SCHEMA_V1.to_string(),
30 entity: entity.into(),
31 updated_at: None,
32 files: BTreeMap::new(),
33 }
34 }
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38pub struct EntityFileState {
39 pub processed_at: String,
40 pub size: Option<u64>,
41 pub mtime: Option<String>,
42}
43
44#[derive(Debug, Clone)]
45pub struct EntityStateInspection {
46 pub entity_name: String,
47 pub incremental_mode: IncrementalMode,
48 pub path: ResolvedPath,
49 pub state: Option<EntityState>,
50}
51
52pub fn resolve_entity_state_path(
53 resolver: &StorageResolver,
54 entity: &EntityConfig,
55) -> FloeResult<ResolvedPath> {
56 if let Some(state) = &entity.state {
57 if let Some(path) = state.path.as_deref() {
58 let resolved = if is_remote_uri(path) {
59 resolver.resolve_path(
60 &entity.name,
61 "entity.state.path",
62 entity.source.storage.as_deref(),
63 path,
64 )?
65 } else {
66 resolver.resolve_local_path(path)?
67 };
68 return Ok(with_local_state_fallback(resolver, entity, resolved));
69 }
70 }
71
72 let resolved_source = resolver.resolve_path(
73 &entity.name,
74 "entity.source.path",
75 entity.source.storage.as_deref(),
76 &entity.source.path,
77 )?;
78 let source_root = derive_source_root(
79 &entity.source.path,
80 &entity.source.format,
81 resolved_source.local_path.as_deref(),
82 );
83 let default_path = join_state_path(&source_root, &entity.name);
84 let resolved = resolver.resolve_path(
85 &entity.name,
86 "entity.state.path",
87 entity.source.storage.as_deref(),
88 &default_path,
89 )?;
90 Ok(with_local_state_fallback(resolver, entity, resolved))
91}
92
93fn with_local_state_fallback(
94 resolver: &StorageResolver,
95 entity: &EntityConfig,
96 mut resolved: ResolvedPath,
97) -> ResolvedPath {
98 if resolved.local_path.is_none() {
99 resolved.local_path = Some(default_local_state_cache_path(
100 resolver,
101 entity,
102 &resolved.uri,
103 ));
104 }
105 resolved
106}
107
108fn default_local_state_cache_path(
109 resolver: &StorageResolver,
110 entity: &EntityConfig,
111 resolved_uri: &str,
112) -> PathBuf {
113 if resolver.config_is_remote() {
114 remote_config_state_cache_root()
115 .join(short_stable_hash_hex(resolved_uri))
116 .join(&entity.name)
117 .join(ENTITY_STATE_FILENAME)
118 } else {
119 resolver
120 .config_local_dir()
121 .join(".floe/state")
122 .join(&entity.name)
123 .join(ENTITY_STATE_FILENAME)
124 }
125}
126
127fn remote_config_state_cache_root() -> PathBuf {
128 if let Some(path) = std::env::var_os("XDG_CACHE_HOME") {
129 let path = PathBuf::from(path);
130 if path.is_absolute() {
131 return path.join("floe/state");
132 }
133 }
134 if let Some(home) = std::env::var_os("HOME") {
135 return PathBuf::from(home).join(".cache/floe/state");
136 }
137 std::env::current_dir()
138 .unwrap_or_else(|_| PathBuf::from("."))
139 .join(".floe/state")
140}
141
142fn short_stable_hash_hex(value: &str) -> String {
143 let mut hash: u64 = 0xcbf29ce484222325;
144 for byte in value.as_bytes() {
145 hash ^= u64::from(*byte);
146 hash = hash.wrapping_mul(0x100000001b3);
147 }
148 format!("{:016x}", hash)
149}
150
151pub fn read_entity_state(path: &Path) -> FloeResult<Option<EntityState>> {
152 if !path.exists() {
153 return Ok(None);
154 }
155 let payload = fs::read_to_string(path)?;
156 let state: EntityState = serde_json::from_str(&payload)?;
157 Ok(Some(state))
158}
159
160pub fn inspect_entity_state_with_base(
161 config_path: &Path,
162 config_base: ConfigBase,
163 entity_name: &str,
164) -> FloeResult<EntityStateInspection> {
165 let config = crate::load_config(config_path)?;
166 inspect_entity_state(&config, config_base, entity_name)
167}
168
169pub fn inspect_entity_state(
170 config: &RootConfig,
171 config_base: ConfigBase,
172 entity_name: &str,
173) -> FloeResult<EntityStateInspection> {
174 let (entity, path) = resolve_entity_state_target(config, config_base, entity_name)?;
175 let state = match &path.local_path {
176 Some(local_path) => read_entity_state(local_path)?
177 .map(|state| validate_entity_state(entity, state))
178 .transpose()?,
179 None => None,
180 };
181
182 Ok(EntityStateInspection {
183 entity_name: entity.name.clone(),
184 incremental_mode: entity.incremental_mode,
185 path,
186 state,
187 })
188}
189
190pub fn reset_entity_state_with_base(
191 config_path: &Path,
192 config_base: ConfigBase,
193 entity_name: &str,
194) -> FloeResult<bool> {
195 let config = crate::load_config(config_path)?;
196 let (entity, path) = resolve_entity_state_target(&config, config_base, entity_name)?;
197 let Some(local_path) = path.local_path.as_ref() else {
198 return Err(Box::new(ConfigError(format!(
199 "entity.name={} state path is not local and cannot be reset: {}",
200 entity.name, path.uri
201 ))));
202 };
203
204 if !local_path.exists() {
205 return Ok(false);
206 }
207
208 fs::remove_file(local_path)?;
209 Ok(true)
210}
211
212fn resolve_entity_state_target<'a>(
213 config: &'a RootConfig,
214 config_base: ConfigBase,
215 entity_name: &str,
216) -> FloeResult<(&'a EntityConfig, ResolvedPath)> {
217 let entity = config
218 .entities
219 .iter()
220 .find(|entity| entity.name == entity_name)
221 .ok_or_else(|| {
222 Box::new(ConfigError(format!("entity not found: {entity_name}")))
223 as Box<dyn std::error::Error + Send + Sync>
224 })?;
225 let resolver = StorageResolver::new(config, config_base)?;
226 let path = resolve_entity_state_path(&resolver, entity)?;
227 Ok((entity, path))
228}
229
230pub fn write_entity_state_atomic(path: &Path, state: &EntityState) -> FloeResult<()> {
231 let parent = path.parent().ok_or_else(|| {
232 Box::new(ConfigError(format!(
233 "state path has no parent directory: {}",
234 path.display()
235 ))) as Box<dyn std::error::Error + Send + Sync>
236 })?;
237 fs::create_dir_all(parent)?;
238
239 let mut temp = NamedTempFile::new_in(parent)?;
240 serde_json::to_writer_pretty(temp.as_file_mut(), state)?;
241 temp.as_file_mut().sync_all()?;
242 temp.persist(path).map_err(|err| err.error)?;
243 Ok(())
244}
245
246fn join_state_path(source_root: &str, entity_name: &str) -> String {
247 if source_root.is_empty() || source_root == "." {
248 format!(".floe/state/{entity_name}/{ENTITY_STATE_FILENAME}")
249 } else {
250 format!(
251 "{}/.floe/state/{entity_name}/{ENTITY_STATE_FILENAME}",
252 source_root.trim_end_matches(is_path_separator)
253 )
254 }
255}
256
257fn derive_source_root(
258 raw_path: &str,
259 source_format: &str,
260 resolved_local_path: Option<&Path>,
261) -> String {
262 let trimmed = raw_path.trim_end_matches(is_path_separator);
263 if trimmed.is_empty() {
264 return String::new();
265 }
266
267 if let Some(prefix) = prefix_before_first_glob(trimmed) {
268 if prefix.is_empty() {
269 return String::new();
270 }
271 if prefix.ends_with(is_path_separator) {
272 return prefix.trim_end_matches(is_path_separator).to_string();
273 }
274 return parent_like(prefix)
275 .unwrap_or(prefix)
276 .trim_end_matches(is_path_separator)
277 .to_string();
278 }
279
280 if let Some(path) = resolved_local_path.filter(|path| path.exists()) {
281 if path.is_dir() {
282 return trimmed.to_string();
283 }
284 if path.is_file() {
285 return parent_like(trimmed)
286 .unwrap_or(trimmed)
287 .trim_end_matches(is_path_separator)
288 .to_string();
289 }
290 }
291
292 if matches_source_file_suffix(trimmed, source_format) {
293 return parent_like(trimmed)
294 .unwrap_or(trimmed)
295 .trim_end_matches(is_path_separator)
296 .to_string();
297 }
298
299 trimmed.to_string()
300}
301
302fn prefix_before_first_glob(value: &str) -> Option<&str> {
303 let index = value.find(['*', '?', '['])?;
304 Some(&value[..index])
305}
306
307fn matches_source_file_suffix(value: &str, source_format: &str) -> bool {
308 let Some(segment) = value.rsplit(is_path_separator).next() else {
309 return false;
310 };
311 let segment = segment.to_ascii_lowercase();
312
313 extensions::suffixes_for_format(source_format)
314 .map(|suffixes| {
315 suffixes
316 .iter()
317 .any(|suffix| segment.ends_with(&suffix.to_ascii_lowercase()))
318 })
319 .unwrap_or(false)
320}
321
322fn parent_like(value: &str) -> Option<&str> {
323 value.rfind(is_path_separator).map(|index| {
324 if index == 0 {
325 &value[..1]
326 } else {
327 &value[..index]
328 }
329 })
330}
331
332fn is_path_separator(ch: char) -> bool {
333 ch == '/' || ch == '\\'
334}
335
336fn is_remote_uri(value: &str) -> bool {
337 value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
338}
339
340pub fn validate_entity_state(entity: &EntityConfig, state: EntityState) -> FloeResult<EntityState> {
341 if state.schema != ENTITY_STATE_SCHEMA_V1 {
342 return Err(Box::new(ConfigError(format!(
343 "entity.name={} state schema mismatch: expected {}, got {}",
344 entity.name, ENTITY_STATE_SCHEMA_V1, state.schema
345 ))));
346 }
347
348 if state.entity != entity.name {
349 return Err(Box::new(ConfigError(format!(
350 "entity.name={} state entity mismatch: expected {}, got {}",
351 entity.name, entity.name, state.entity
352 ))));
353 }
354
355 Ok(state)
356}