Skip to main content

floe_core/state/
mod.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use serde::{Deserialize, Serialize};
6use tempfile::NamedTempFile;
7
8use crate::config::{
9    ConfigBase, EntityConfig, IncrementalMode, ResolvedPath, RootConfig, StorageResolver,
10};
11use crate::io::storage::{
12    extensions, local::LocalClient, CloudClient, ConditionalWrite, StorageClient, StoredObject,
13};
14use crate::{ConfigError, FloeResult};
15
16pub const ENTITY_STATE_SCHEMA_V1: &str = "floe.state.file-ingest.v1";
17pub const ENTITY_STATE_SCHEMA_V2: &str = "floe.state.file-ingest.v2";
18pub const ENTITY_STATE_FILENAME: &str = "state.json";
19const STATE_CAS_RETRIES: usize = 5;
20pub const CLAIM_TTL_SECONDS: i64 = 60 * 60;
21
22#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
23pub struct EntityState {
24    pub schema: String,
25    pub entity: String,
26    pub updated_at: Option<String>,
27    #[serde(default)]
28    pub files: BTreeMap<String, EntityFileState>,
29    #[serde(default)]
30    pub claims: BTreeMap<String, EntityFileClaim>,
31}
32
33impl EntityState {
34    pub fn new(entity: impl Into<String>) -> Self {
35        Self {
36            schema: ENTITY_STATE_SCHEMA_V2.to_string(),
37            entity: entity.into(),
38            updated_at: None,
39            files: BTreeMap::new(),
40            claims: BTreeMap::new(),
41        }
42    }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
46pub struct EntityFileState {
47    pub processed_at: String,
48    pub size: Option<u64>,
49    pub mtime: Option<String>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub struct EntityFileClaim {
54    pub run_id: String,
55    pub acquired_at: String,
56    pub expires_at: String,
57    pub size: Option<u64>,
58    pub mtime: Option<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct EntityStateInspection {
63    pub entity_name: String,
64    pub incremental_mode: IncrementalMode,
65    pub path: ResolvedPath,
66    pub state: Option<EntityState>,
67}
68
69pub fn resolve_entity_state_path(
70    resolver: &StorageResolver,
71    entity: &EntityConfig,
72) -> FloeResult<ResolvedPath> {
73    if let Some(state) = &entity.state {
74        if let Some(path) = state.path.as_deref() {
75            let resolved = if is_remote_uri(path) {
76                resolver.resolve_path(
77                    &entity.name,
78                    "entity.state.path",
79                    entity.source.storage.as_deref(),
80                    path,
81                )?
82            } else {
83                resolver.resolve_local_path(path)?
84            };
85            return Ok(resolved);
86        }
87    }
88
89    let resolved_source = resolver.resolve_path(
90        &entity.name,
91        "entity.source.path",
92        entity.source.storage.as_deref(),
93        &entity.source.path,
94    )?;
95    let source_root = derive_source_root(
96        &entity.source.path,
97        &entity.source.format,
98        resolved_source.local_path.as_deref(),
99    );
100    let default_path = join_state_path(&source_root, &entity.name);
101    let resolved = resolver.resolve_path(
102        &entity.name,
103        "entity.state.path",
104        entity.source.storage.as_deref(),
105        &default_path,
106    )?;
107    Ok(resolved)
108}
109
110pub fn read_entity_state(path: &Path) -> FloeResult<Option<EntityState>> {
111    if !path.exists() {
112        return Ok(None);
113    }
114    let payload = fs::read_to_string(path)?;
115    let state = parse_entity_state(payload.as_bytes())?;
116    Ok(Some(state))
117}
118
119fn parse_entity_state(payload: &[u8]) -> FloeResult<EntityState> {
120    let mut state: EntityState = serde_json::from_slice(payload)?;
121    if state.schema == ENTITY_STATE_SCHEMA_V1 {
122        state.schema = ENTITY_STATE_SCHEMA_V2.to_string();
123        state.claims.clear();
124    }
125    Ok(state)
126}
127
128#[derive(Debug, Clone)]
129pub enum EntityStateTarget {
130    Local { path: PathBuf, uri: String },
131    Remote { storage: String, uri: String },
132}
133
134#[derive(Debug, Clone)]
135pub struct LoadedEntityState {
136    pub target: EntityStateTarget,
137    pub state: EntityState,
138    pub version: Option<String>,
139    pub existed: bool,
140}
141
142#[derive(Debug, Clone)]
143pub struct ClaimedEntityState {
144    pub target: EntityStateTarget,
145    pub state: EntityState,
146    pub version: Option<String>,
147}
148
149#[derive(Debug, Clone)]
150pub struct EntityStateClaimOutcome {
151    pub pending_inputs: Vec<crate::io::format::InputFile>,
152    pub claimed_state: Option<ClaimedEntityState>,
153    pub active_claims: Vec<String>,
154    pub already_processed: Vec<(crate::io::format::InputFile, EntityFileState)>,
155}
156
157pub fn claim_entity_inputs(
158    resolver: &StorageResolver,
159    cloud: &mut CloudClient,
160    entity: &EntityConfig,
161    run_id: &str,
162    input_files: Vec<crate::io::format::InputFile>,
163) -> FloeResult<EntityStateClaimOutcome> {
164    if input_files.is_empty() {
165        return Ok(EntityStateClaimOutcome {
166            pending_inputs: Vec::new(),
167            claimed_state: None,
168            active_claims: Vec::new(),
169            already_processed: Vec::new(),
170        });
171    }
172
173    for _ in 0..STATE_CAS_RETRIES {
174        let mut loaded = load_entity_state(resolver, cloud, entity)?;
175        remove_expired_claims(&mut loaded.state);
176        let mut pending_inputs = Vec::new();
177        let mut active_claims = Vec::new();
178        let mut already_processed = Vec::new();
179        let acquired_at = now_rfc3339();
180        let expires_at = rfc3339_after_seconds(CLAIM_TTL_SECONDS);
181
182        for input_file in &input_files {
183            if let Some(recorded) = loaded.state.files.get(&input_file.source_uri) {
184                already_processed.push((input_file.clone(), recorded.clone()));
185                continue;
186            }
187            match loaded.state.claims.get(&input_file.source_uri) {
188                Some(_) => {
189                    active_claims.push(input_file.source_uri.clone());
190                }
191                None => {
192                    loaded.state.claims.insert(
193                        input_file.source_uri.clone(),
194                        EntityFileClaim {
195                            run_id: run_id.to_string(),
196                            acquired_at: acquired_at.clone(),
197                            expires_at: expires_at.clone(),
198                            size: input_file.source_size,
199                            mtime: input_file.source_mtime.clone(),
200                        },
201                    );
202                    pending_inputs.push(input_file.clone());
203                }
204            }
205        }
206
207        if pending_inputs.is_empty() {
208            if active_claims.is_empty() {
209                let _ = persist_loaded_state(cloud, resolver, &loaded)?;
210            }
211            return Ok(EntityStateClaimOutcome {
212                pending_inputs,
213                claimed_state: None,
214                active_claims,
215                already_processed,
216            });
217        }
218
219        loaded.state.schema = ENTITY_STATE_SCHEMA_V2.to_string();
220        loaded.state.updated_at = Some(acquired_at);
221        match persist_loaded_state(cloud, resolver, &loaded)? {
222            Some(version) => {
223                return Ok(EntityStateClaimOutcome {
224                    pending_inputs,
225                    claimed_state: Some(ClaimedEntityState {
226                        target: loaded.target,
227                        state: loaded.state,
228                        version,
229                    }),
230                    active_claims,
231                    already_processed,
232                });
233            }
234            None => continue,
235        }
236    }
237
238    Err(Box::new(ConfigError(format!(
239        "entity.name={} incremental state update conflicted after {STATE_CAS_RETRIES} retries",
240        entity.name
241    ))))
242}
243
244/// Full-refresh variant of `claim_entity_inputs`.
245///
246/// Loads existing state to get the current CAS version, then writes a blank
247/// `EntityState` containing fresh claims for ALL `input_files` — discarding
248/// the historical `files` and `claims` maps. On successful commit the state
249/// file will contain only the files processed in this run.
250///
251/// Returns `None` when `input_files` is empty (nothing to claim).
252pub fn claim_all_entity_inputs(
253    resolver: &StorageResolver,
254    cloud: &mut CloudClient,
255    entity: &EntityConfig,
256    run_id: &str,
257    input_files: Vec<crate::io::format::InputFile>,
258) -> FloeResult<Option<ClaimedEntityState>> {
259    let acquired_at = now_rfc3339();
260    let expires_at = rfc3339_after_seconds(CLAIM_TTL_SECONDS);
261
262    for _ in 0..STATE_CAS_RETRIES {
263        // Read only to obtain the current CAS version; content is discarded.
264        let loaded = load_entity_state(resolver, cloud, entity)?;
265
266        let mut fresh_state = EntityState::new(&entity.name);
267        fresh_state.files = loaded.state.files.clone();
268        fresh_state.updated_at = Some(acquired_at.clone());
269        for input_file in &input_files {
270            fresh_state.claims.insert(
271                input_file.source_uri.clone(),
272                EntityFileClaim {
273                    run_id: run_id.to_string(),
274                    acquired_at: acquired_at.clone(),
275                    expires_at: expires_at.clone(),
276                    size: input_file.source_size,
277                    mtime: input_file.source_mtime.clone(),
278                },
279            );
280        }
281
282        let fresh_loaded = LoadedEntityState {
283            target: loaded.target,
284            state: fresh_state,
285            version: loaded.version,
286            existed: loaded.existed,
287        };
288        match persist_loaded_state(cloud, resolver, &fresh_loaded)? {
289            Some(version) => {
290                return Ok(Some(ClaimedEntityState {
291                    target: fresh_loaded.target,
292                    state: fresh_loaded.state,
293                    version,
294                }));
295            }
296            None => continue,
297        }
298    }
299
300    Err(Box::new(ConfigError(format!(
301        "entity.name={} full-refresh state write conflicted after {STATE_CAS_RETRIES} retries",
302        entity.name
303    ))))
304}
305
306pub fn promote_claimed_entity_state(
307    resolver: &StorageResolver,
308    cloud: &mut CloudClient,
309    entity_name: &str,
310    run_id: &str,
311    claimed: &ClaimedEntityState,
312) -> FloeResult<()> {
313    mutate_claimed_state(resolver, cloud, entity_name, claimed, |state, our_uris| {
314        let processed_at = now_rfc3339();
315        let claimed_files: Vec<String> = state
316            .claims
317            .iter()
318            .filter(|(uri, claim)| claim.run_id == run_id && our_uris.contains(*uri))
319            .map(|(source_uri, _)| source_uri.clone())
320            .collect();
321        for source_uri in claimed_files {
322            if let Some(claim) = state.claims.remove(&source_uri) {
323                state.files.insert(
324                    source_uri,
325                    EntityFileState {
326                        processed_at: processed_at.clone(),
327                        size: claim.size,
328                        mtime: claim.mtime,
329                    },
330                );
331            }
332        }
333        state.updated_at = Some(processed_at);
334    })
335}
336
337pub fn promote_full_refresh_claimed_entity_state(
338    resolver: &StorageResolver,
339    cloud: &mut CloudClient,
340    entity_name: &str,
341    run_id: &str,
342    claimed: &ClaimedEntityState,
343) -> FloeResult<()> {
344    mutate_claimed_state(resolver, cloud, entity_name, claimed, |state, our_uris| {
345        let processed_at = now_rfc3339();
346        let claimed_files: Vec<String> = state
347            .claims
348            .iter()
349            .filter(|(uri, claim)| claim.run_id == run_id && our_uris.contains(*uri))
350            .map(|(uri, _)| uri.clone())
351            .collect();
352        state.files.clear();
353        for source_uri in claimed_files {
354            if let Some(claim) = state.claims.remove(&source_uri) {
355                state.files.insert(
356                    source_uri,
357                    EntityFileState {
358                        processed_at: processed_at.clone(),
359                        size: claim.size,
360                        mtime: claim.mtime,
361                    },
362                );
363            }
364        }
365        state.updated_at = Some(processed_at);
366    })
367}
368
369pub fn release_claimed_entity_state(
370    resolver: &StorageResolver,
371    cloud: &mut CloudClient,
372    entity_name: &str,
373    run_id: &str,
374    claimed: &ClaimedEntityState,
375) -> FloeResult<()> {
376    mutate_claimed_state(resolver, cloud, entity_name, claimed, |state, our_uris| {
377        state
378            .claims
379            .retain(|uri, claim| !(claim.run_id == run_id && our_uris.contains(uri)));
380        state.updated_at = Some(now_rfc3339());
381    })
382}
383
384pub fn renew_claimed_entity_state(
385    resolver: &StorageResolver,
386    cloud: &mut CloudClient,
387    entity_name: &str,
388    run_id: &str,
389    claimed: &ClaimedEntityState,
390) -> FloeResult<()> {
391    mutate_claimed_state(resolver, cloud, entity_name, claimed, |state, our_uris| {
392        let now = now_rfc3339();
393        let expires_at = rfc3339_after_seconds(CLAIM_TTL_SECONDS);
394        for (uri, claim) in state.claims.iter_mut() {
395            if claim.run_id == run_id && our_uris.contains(uri) {
396                claim.expires_at = expires_at.clone();
397            }
398        }
399        state.updated_at = Some(now);
400    })
401}
402
403fn mutate_claimed_state(
404    resolver: &StorageResolver,
405    cloud: &mut CloudClient,
406    entity_name: &str,
407    claimed: &ClaimedEntityState,
408    mutate: impl Fn(&mut EntityState, &HashSet<String>),
409) -> FloeResult<()> {
410    let our_uris: HashSet<String> = claimed.state.claims.keys().cloned().collect();
411    for attempt in 0..STATE_CAS_RETRIES {
412        let mut loaded = if attempt == 0 {
413            LoadedEntityState {
414                target: claimed.target.clone(),
415                state: claimed.state.clone(),
416                version: claimed.version.clone(),
417                existed: claimed.version.is_some(),
418            }
419        } else {
420            load_target_state_with_entity_name(
421                cloud,
422                resolver,
423                entity_name,
424                claimed.target.clone(),
425            )?
426        };
427        mutate(&mut loaded.state, &our_uris);
428        loaded.state.schema = ENTITY_STATE_SCHEMA_V2.to_string();
429        let persisted = if loaded.state.files.is_empty() && loaded.state.claims.is_empty() {
430            delete_loaded_state(cloud, resolver, &loaded)?
431        } else {
432            persist_loaded_state(cloud, resolver, &loaded)?
433        };
434        if persisted.is_some() {
435            return Ok(());
436        }
437    }
438    Err(Box::new(ConfigError(format!(
439        "entity.name={} incremental state update conflicted after {STATE_CAS_RETRIES} retries",
440        entity_name
441    ))))
442}
443
444pub fn inspect_entity_state_with_base(
445    config_path: &Path,
446    config_base: ConfigBase,
447    entity_name: &str,
448) -> FloeResult<EntityStateInspection> {
449    let config = crate::load_config(config_path)?;
450    inspect_entity_state(&config, config_base, entity_name)
451}
452
453pub fn inspect_entity_state(
454    config: &RootConfig,
455    config_base: ConfigBase,
456    entity_name: &str,
457) -> FloeResult<EntityStateInspection> {
458    let (entity, path) = resolve_entity_state_target(config, config_base.clone(), entity_name)?;
459    let resolver = StorageResolver::new(config, config_base)?;
460    let target = state_target_from_resolved(&path)?;
461    let mut cloud = CloudClient::new();
462    let loaded = load_target_state_with_resolver(&mut cloud, &resolver, entity, target)?;
463    let state = loaded.existed.then_some(loaded.state);
464
465    Ok(EntityStateInspection {
466        entity_name: entity.name.clone(),
467        incremental_mode: entity.incremental_mode,
468        path,
469        state,
470    })
471}
472
473pub fn reset_entity_state(
474    config: &RootConfig,
475    config_base: ConfigBase,
476    entity_name: &str,
477) -> FloeResult<bool> {
478    let (entity, path) = resolve_entity_state_target(config, config_base.clone(), entity_name)?;
479    let target = state_target_from_resolved(&path)?;
480    match target {
481        EntityStateTarget::Local { path, .. } => {
482            if path.exists() {
483                fs::remove_file(&path)?;
484                return Ok(true);
485            }
486            Ok(false)
487        }
488        EntityStateTarget::Remote { storage, uri } => {
489            let mut cloud = CloudClient::new();
490            let resolver = StorageResolver::new(config, config_base)?;
491            let client = cloud.client_for_context(
492                &resolver,
493                &storage,
494                &format!("entity.name={}", entity.name),
495            )?;
496            let Some(object) = client.read_object(&uri)? else {
497                return Ok(false);
498            };
499            match client.delete_object_conditional(&uri, Some(&object.version))? {
500                ConditionalWrite::Written { .. } => Ok(true),
501                ConditionalWrite::Conflict => Err(Box::new(ConfigError(format!(
502                    "entity.name={} remote state changed while resetting: {}",
503                    entity.name, uri
504                )))),
505            }
506        }
507    }
508}
509
510pub fn reset_entity_state_with_base(
511    config_path: &Path,
512    config_base: ConfigBase,
513    entity_name: &str,
514) -> FloeResult<bool> {
515    let config = crate::load_config(config_path)?;
516    reset_entity_state(&config, config_base, entity_name)
517}
518
519fn resolve_entity_state_target<'a>(
520    config: &'a RootConfig,
521    config_base: ConfigBase,
522    entity_name: &str,
523) -> FloeResult<(&'a EntityConfig, ResolvedPath)> {
524    let entity = config
525        .entities
526        .iter()
527        .find(|entity| entity.name == entity_name)
528        .ok_or_else(|| {
529            Box::new(ConfigError(format!("entity not found: {entity_name}")))
530                as Box<dyn std::error::Error + Send + Sync>
531        })?;
532    let resolver = StorageResolver::new(config, config_base)?;
533    let path = resolve_entity_state_path(&resolver, entity)?;
534    Ok((entity, path))
535}
536
537pub fn write_entity_state_atomic(path: &Path, state: &EntityState) -> FloeResult<()> {
538    let parent = path.parent().ok_or_else(|| {
539        Box::new(ConfigError(format!(
540            "state path has no parent directory: {}",
541            path.display()
542        ))) as Box<dyn std::error::Error + Send + Sync>
543    })?;
544    fs::create_dir_all(parent)?;
545
546    let mut temp = NamedTempFile::new_in(parent)?;
547    serde_json::to_writer_pretty(temp.as_file_mut(), state)?;
548    temp.as_file_mut().sync_all()?;
549    temp.persist(path).map_err(|err| err.error)?;
550    Ok(())
551}
552
553fn load_entity_state(
554    resolver: &StorageResolver,
555    cloud: &mut CloudClient,
556    entity: &EntityConfig,
557) -> FloeResult<LoadedEntityState> {
558    let resolved = resolve_entity_state_path(resolver, entity)?;
559    let target = state_target_from_resolved(&resolved)?;
560    load_target_state_with_resolver(cloud, resolver, entity, target)
561}
562
563fn load_target_state_with_resolver(
564    cloud: &mut CloudClient,
565    resolver: &StorageResolver,
566    entity: &EntityConfig,
567    target: EntityStateTarget,
568) -> FloeResult<LoadedEntityState> {
569    load_target_state_with_entity_name(cloud, resolver, &entity.name, target)
570}
571
572fn load_target_state_with_entity_name(
573    cloud: &mut CloudClient,
574    resolver: &StorageResolver,
575    entity_name: &str,
576    target: EntityStateTarget,
577) -> FloeResult<LoadedEntityState> {
578    match target {
579        EntityStateTarget::Local { path, uri } => {
580            let object = LocalClient::new().read_object(&uri)?;
581            let (state, version, existed) = resolve_loaded_state(entity_name, object)?;
582            Ok(LoadedEntityState {
583                target: EntityStateTarget::Local { path, uri },
584                state,
585                version,
586                existed,
587            })
588        }
589        EntityStateTarget::Remote { storage, uri } => {
590            let client = cloud.client_for_context(
591                resolver,
592                &storage,
593                &format!("entity.name={entity_name}"),
594            )?;
595            let object = client.read_object(&uri)?;
596            let (state, version, existed) = resolve_loaded_state(entity_name, object)?;
597            Ok(LoadedEntityState {
598                target: EntityStateTarget::Remote { storage, uri },
599                state,
600                version,
601                existed,
602            })
603        }
604    }
605}
606
607fn resolve_loaded_state(
608    entity_name: &str,
609    object: Option<StoredObject>,
610) -> FloeResult<(EntityState, Option<String>, bool)> {
611    match object {
612        Some(object) => Ok((
613            validate_entity_state_name(entity_name, parse_entity_state(&object.body)?)?,
614            Some(object.version),
615            true,
616        )),
617        None => Ok((EntityState::new(entity_name), None, false)),
618    }
619}
620
621fn with_state_client<R, F>(
622    cloud: &mut CloudClient,
623    resolver: &StorageResolver,
624    target: &EntityStateTarget,
625    f: F,
626) -> FloeResult<R>
627where
628    F: FnOnce(&str, &dyn StorageClient) -> FloeResult<R>,
629{
630    match target {
631        EntityStateTarget::Local { uri, .. } => f(uri, &LocalClient::new()),
632        EntityStateTarget::Remote { uri, storage } => {
633            let client = cloud.client_for_context(resolver, storage, "entity state")?;
634            f(uri, client)
635        }
636    }
637}
638
639fn conditional_write_to_version(cw: ConditionalWrite) -> Option<Option<String>> {
640    match cw {
641        ConditionalWrite::Written { version } => Some(Some(version)),
642        ConditionalWrite::Conflict => None,
643    }
644}
645
646fn persist_loaded_state(
647    cloud: &mut CloudClient,
648    resolver: &StorageResolver,
649    loaded: &LoadedEntityState,
650) -> FloeResult<Option<Option<String>>> {
651    let mut state = loaded.state.clone();
652    state.schema = ENTITY_STATE_SCHEMA_V2.to_string();
653    let body = serde_json::to_vec_pretty(&state)?;
654    let version = loaded.version.as_deref();
655    let cw = with_state_client(cloud, resolver, &loaded.target, |uri, client| {
656        client.write_object_conditional(uri, version, &body)
657    })?;
658    Ok(conditional_write_to_version(cw))
659}
660
661fn delete_loaded_state(
662    cloud: &mut CloudClient,
663    resolver: &StorageResolver,
664    loaded: &LoadedEntityState,
665) -> FloeResult<Option<Option<String>>> {
666    let version = loaded.version.as_deref();
667    let cw = with_state_client(cloud, resolver, &loaded.target, |uri, client| {
668        client.delete_object_conditional(uri, version)
669    })?;
670    Ok(conditional_write_to_version(cw))
671}
672
673fn state_target_from_resolved(resolved: &ResolvedPath) -> FloeResult<EntityStateTarget> {
674    if let Some(path) = &resolved.local_path {
675        return Ok(EntityStateTarget::Local {
676            path: path.clone(),
677            uri: resolved.uri.clone(),
678        });
679    }
680    if is_remote_uri(&resolved.uri) {
681        return Ok(EntityStateTarget::Remote {
682            storage: resolved.storage.clone(),
683            uri: resolved.uri.clone(),
684        });
685    }
686    Err(Box::new(ConfigError(format!(
687        "state path is neither local nor supported remote: {}",
688        resolved.uri
689    ))))
690}
691
692fn remove_expired_claims(state: &mut EntityState) {
693    let now = time::OffsetDateTime::now_utc();
694    state.claims.retain(|_, claim| {
695        time::OffsetDateTime::parse(
696            &claim.expires_at,
697            &time::format_description::well_known::Rfc3339,
698        )
699        .map(|expires_at| expires_at > now)
700        .unwrap_or(false)
701    });
702}
703
704fn rfc3339_offset(seconds: i64) -> String {
705    (time::OffsetDateTime::now_utc() + time::Duration::seconds(seconds))
706        .format(&time::format_description::well_known::Rfc3339)
707        .unwrap_or_else(|_| crate::report::now_rfc3339())
708}
709
710fn now_rfc3339() -> String {
711    rfc3339_offset(0)
712}
713
714fn rfc3339_after_seconds(seconds: i64) -> String {
715    rfc3339_offset(seconds)
716}
717
718fn join_state_path(source_root: &str, entity_name: &str) -> String {
719    if source_root.is_empty() || source_root == "." {
720        format!(".floe/state/{entity_name}/{ENTITY_STATE_FILENAME}")
721    } else {
722        format!(
723            "{}/.floe/state/{entity_name}/{ENTITY_STATE_FILENAME}",
724            source_root.trim_end_matches(is_path_separator)
725        )
726    }
727}
728
729fn derive_source_root(
730    raw_path: &str,
731    source_format: &str,
732    resolved_local_path: Option<&Path>,
733) -> String {
734    let trimmed = raw_path.trim_end_matches(is_path_separator);
735    if trimmed.is_empty() {
736        return String::new();
737    }
738
739    if let Some(prefix) = prefix_before_first_glob(trimmed) {
740        if prefix.is_empty() {
741            return String::new();
742        }
743        if prefix.ends_with(is_path_separator) {
744            return prefix.trim_end_matches(is_path_separator).to_string();
745        }
746        return parent_like(prefix)
747            .unwrap_or(prefix)
748            .trim_end_matches(is_path_separator)
749            .to_string();
750    }
751
752    if let Some(path) = resolved_local_path.filter(|path| path.exists()) {
753        if path.is_dir() {
754            return trimmed.to_string();
755        }
756        if path.is_file() {
757            return parent_like(trimmed)
758                .unwrap_or(trimmed)
759                .trim_end_matches(is_path_separator)
760                .to_string();
761        }
762    }
763
764    if matches_source_file_suffix(trimmed, source_format) {
765        return parent_like(trimmed)
766            .unwrap_or(trimmed)
767            .trim_end_matches(is_path_separator)
768            .to_string();
769    }
770
771    trimmed.to_string()
772}
773
774fn prefix_before_first_glob(value: &str) -> Option<&str> {
775    let index = value.find(['*', '?', '['])?;
776    Some(&value[..index])
777}
778
779fn matches_source_file_suffix(value: &str, source_format: &str) -> bool {
780    let Some(segment) = value.rsplit(is_path_separator).next() else {
781        return false;
782    };
783    let segment = segment.to_ascii_lowercase();
784
785    extensions::suffixes_for_format(source_format)
786        .map(|suffixes| {
787            suffixes
788                .iter()
789                .any(|suffix| segment.ends_with(&suffix.to_ascii_lowercase()))
790        })
791        .unwrap_or(false)
792}
793
794fn parent_like(value: &str) -> Option<&str> {
795    value.rfind(is_path_separator).map(|index| {
796        if index == 0 {
797            &value[..1]
798        } else {
799            &value[..index]
800        }
801    })
802}
803
804fn is_path_separator(ch: char) -> bool {
805    ch == '/' || ch == '\\'
806}
807
808fn is_remote_uri(value: &str) -> bool {
809    value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
810}
811
812pub fn validate_entity_state(entity: &EntityConfig, state: EntityState) -> FloeResult<EntityState> {
813    validate_entity_state_name(&entity.name, state)
814}
815
816fn validate_entity_state_name(entity_name: &str, state: EntityState) -> FloeResult<EntityState> {
817    if state.schema != ENTITY_STATE_SCHEMA_V1 && state.schema != ENTITY_STATE_SCHEMA_V2 {
818        return Err(Box::new(ConfigError(format!(
819            "entity.name={} state schema mismatch: expected {} or {}, got {}",
820            entity_name, ENTITY_STATE_SCHEMA_V1, ENTITY_STATE_SCHEMA_V2, state.schema
821        ))));
822    }
823
824    if state.entity != entity_name {
825        return Err(Box::new(ConfigError(format!(
826            "entity.name={} state entity mismatch: expected {}, got {}",
827            entity_name, entity_name, state.entity
828        ))));
829    }
830
831    Ok(state)
832}