1use std::collections::{BTreeMap, HashSet};
2use std::fs;
3use std::path::{Path, PathBuf};
4
5use serde::{Deserialize, Serialize};
6use tempfile::NamedTempFile;
7
8use crate::config::{
9 ConfigBase, EntityConfig, IncrementalMode, ResolvedPath, RootConfig, StorageResolver,
10};
11use crate::io::storage::{
12 extensions, local::LocalClient, CloudClient, ConditionalWrite, StorageClient, StoredObject,
13};
14use crate::{ConfigError, FloeResult};
15
16pub const ENTITY_STATE_SCHEMA_V1: &str = "floe.state.file-ingest.v1";
17pub const ENTITY_STATE_SCHEMA_V2: &str = "floe.state.file-ingest.v2";
18pub const ENTITY_STATE_FILENAME: &str = "state.json";
19const STATE_CAS_RETRIES: usize = 5;
20pub const CLAIM_TTL_SECONDS: i64 = 60 * 60;
21
22#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
23pub struct EntityState {
24 pub schema: String,
25 pub entity: String,
26 pub updated_at: Option<String>,
27 #[serde(default)]
28 pub files: BTreeMap<String, EntityFileState>,
29 #[serde(default)]
30 pub claims: BTreeMap<String, EntityFileClaim>,
31}
32
33impl EntityState {
34 pub fn new(entity: impl Into<String>) -> Self {
35 Self {
36 schema: ENTITY_STATE_SCHEMA_V2.to_string(),
37 entity: entity.into(),
38 updated_at: None,
39 files: BTreeMap::new(),
40 claims: BTreeMap::new(),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
46pub struct EntityFileState {
47 pub processed_at: String,
48 pub size: Option<u64>,
49 pub mtime: Option<String>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
53pub struct EntityFileClaim {
54 pub run_id: String,
55 pub acquired_at: String,
56 pub expires_at: String,
57 pub size: Option<u64>,
58 pub mtime: Option<String>,
59}
60
61#[derive(Debug, Clone)]
62pub struct EntityStateInspection {
63 pub entity_name: String,
64 pub incremental_mode: IncrementalMode,
65 pub path: ResolvedPath,
66 pub state: Option<EntityState>,
67}
68
69pub fn resolve_entity_state_path(
70 resolver: &StorageResolver,
71 entity: &EntityConfig,
72) -> FloeResult<ResolvedPath> {
73 if let Some(state) = &entity.state {
74 if let Some(path) = state.path.as_deref() {
75 let resolved = if is_remote_uri(path) {
76 resolver.resolve_path(
77 &entity.name,
78 "entity.state.path",
79 entity.source.storage.as_deref(),
80 path,
81 )?
82 } else {
83 resolver.resolve_local_path(path)?
84 };
85 return Ok(resolved);
86 }
87 }
88
89 let resolved_source = resolver.resolve_path(
90 &entity.name,
91 "entity.source.path",
92 entity.source.storage.as_deref(),
93 &entity.source.path,
94 )?;
95 let source_root = derive_source_root(
96 &entity.source.path,
97 &entity.source.format,
98 resolved_source.local_path.as_deref(),
99 );
100 let default_path = join_state_path(&source_root, &entity.name);
101 let resolved = resolver.resolve_path(
102 &entity.name,
103 "entity.state.path",
104 entity.source.storage.as_deref(),
105 &default_path,
106 )?;
107 Ok(resolved)
108}
109
110pub fn read_entity_state(path: &Path) -> FloeResult<Option<EntityState>> {
111 if !path.exists() {
112 return Ok(None);
113 }
114 let payload = fs::read_to_string(path)?;
115 let state = parse_entity_state(payload.as_bytes())?;
116 Ok(Some(state))
117}
118
119fn parse_entity_state(payload: &[u8]) -> FloeResult<EntityState> {
120 let mut state: EntityState = serde_json::from_slice(payload)?;
121 if state.schema == ENTITY_STATE_SCHEMA_V1 {
122 state.schema = ENTITY_STATE_SCHEMA_V2.to_string();
123 state.claims.clear();
124 }
125 Ok(state)
126}
127
128#[derive(Debug, Clone)]
129pub enum EntityStateTarget {
130 Local { path: PathBuf, uri: String },
131 Remote { storage: String, uri: String },
132}
133
134#[derive(Debug, Clone)]
135pub struct LoadedEntityState {
136 pub target: EntityStateTarget,
137 pub state: EntityState,
138 pub version: Option<String>,
139 pub existed: bool,
140}
141
142#[derive(Debug, Clone)]
143pub struct ClaimedEntityState {
144 pub target: EntityStateTarget,
145 pub state: EntityState,
146 pub version: Option<String>,
147}
148
149#[derive(Debug, Clone)]
150pub struct EntityStateClaimOutcome {
151 pub pending_inputs: Vec<crate::io::format::InputFile>,
152 pub claimed_state: Option<ClaimedEntityState>,
153 pub active_claims: Vec<String>,
154 pub already_processed: Vec<(crate::io::format::InputFile, EntityFileState)>,
155}
156
157pub fn claim_entity_inputs(
158 resolver: &StorageResolver,
159 cloud: &mut CloudClient,
160 entity: &EntityConfig,
161 run_id: &str,
162 input_files: Vec<crate::io::format::InputFile>,
163) -> FloeResult<EntityStateClaimOutcome> {
164 if input_files.is_empty() {
165 return Ok(EntityStateClaimOutcome {
166 pending_inputs: Vec::new(),
167 claimed_state: None,
168 active_claims: Vec::new(),
169 already_processed: Vec::new(),
170 });
171 }
172
173 for _ in 0..STATE_CAS_RETRIES {
174 let mut loaded = load_entity_state(resolver, cloud, entity)?;
175 remove_expired_claims(&mut loaded.state);
176 let mut pending_inputs = Vec::new();
177 let mut active_claims = Vec::new();
178 let mut already_processed = Vec::new();
179 let acquired_at = now_rfc3339();
180 let expires_at = rfc3339_after_seconds(CLAIM_TTL_SECONDS);
181
182 for input_file in &input_files {
183 if let Some(recorded) = loaded.state.files.get(&input_file.source_uri) {
184 already_processed.push((input_file.clone(), recorded.clone()));
185 continue;
186 }
187 match loaded.state.claims.get(&input_file.source_uri) {
188 Some(_) => {
189 active_claims.push(input_file.source_uri.clone());
190 }
191 None => {
192 loaded.state.claims.insert(
193 input_file.source_uri.clone(),
194 EntityFileClaim {
195 run_id: run_id.to_string(),
196 acquired_at: acquired_at.clone(),
197 expires_at: expires_at.clone(),
198 size: input_file.source_size,
199 mtime: input_file.source_mtime.clone(),
200 },
201 );
202 pending_inputs.push(input_file.clone());
203 }
204 }
205 }
206
207 if pending_inputs.is_empty() {
208 if active_claims.is_empty() {
209 let _ = persist_loaded_state(cloud, resolver, &loaded)?;
210 }
211 return Ok(EntityStateClaimOutcome {
212 pending_inputs,
213 claimed_state: None,
214 active_claims,
215 already_processed,
216 });
217 }
218
219 loaded.state.schema = ENTITY_STATE_SCHEMA_V2.to_string();
220 loaded.state.updated_at = Some(acquired_at);
221 match persist_loaded_state(cloud, resolver, &loaded)? {
222 Some(version) => {
223 return Ok(EntityStateClaimOutcome {
224 pending_inputs,
225 claimed_state: Some(ClaimedEntityState {
226 target: loaded.target,
227 state: loaded.state,
228 version,
229 }),
230 active_claims,
231 already_processed,
232 });
233 }
234 None => continue,
235 }
236 }
237
238 Err(Box::new(ConfigError(format!(
239 "entity.name={} incremental state update conflicted after {STATE_CAS_RETRIES} retries",
240 entity.name
241 ))))
242}
243
244pub fn claim_all_entity_inputs(
253 resolver: &StorageResolver,
254 cloud: &mut CloudClient,
255 entity: &EntityConfig,
256 run_id: &str,
257 input_files: Vec<crate::io::format::InputFile>,
258) -> FloeResult<Option<ClaimedEntityState>> {
259 let acquired_at = now_rfc3339();
260 let expires_at = rfc3339_after_seconds(CLAIM_TTL_SECONDS);
261
262 for _ in 0..STATE_CAS_RETRIES {
263 let loaded = load_entity_state(resolver, cloud, entity)?;
265
266 let mut fresh_state = EntityState::new(&entity.name);
267 fresh_state.files = loaded.state.files.clone();
268 fresh_state.updated_at = Some(acquired_at.clone());
269 for input_file in &input_files {
270 fresh_state.claims.insert(
271 input_file.source_uri.clone(),
272 EntityFileClaim {
273 run_id: run_id.to_string(),
274 acquired_at: acquired_at.clone(),
275 expires_at: expires_at.clone(),
276 size: input_file.source_size,
277 mtime: input_file.source_mtime.clone(),
278 },
279 );
280 }
281
282 let fresh_loaded = LoadedEntityState {
283 target: loaded.target,
284 state: fresh_state,
285 version: loaded.version,
286 existed: loaded.existed,
287 };
288 match persist_loaded_state(cloud, resolver, &fresh_loaded)? {
289 Some(version) => {
290 return Ok(Some(ClaimedEntityState {
291 target: fresh_loaded.target,
292 state: fresh_loaded.state,
293 version,
294 }));
295 }
296 None => continue,
297 }
298 }
299
300 Err(Box::new(ConfigError(format!(
301 "entity.name={} full-refresh state write conflicted after {STATE_CAS_RETRIES} retries",
302 entity.name
303 ))))
304}
305
306pub fn promote_claimed_entity_state(
307 resolver: &StorageResolver,
308 cloud: &mut CloudClient,
309 entity_name: &str,
310 run_id: &str,
311 claimed: &ClaimedEntityState,
312) -> FloeResult<()> {
313 mutate_claimed_state(resolver, cloud, entity_name, claimed, |state, our_uris| {
314 let processed_at = now_rfc3339();
315 let claimed_files: Vec<String> = state
316 .claims
317 .iter()
318 .filter(|(uri, claim)| claim.run_id == run_id && our_uris.contains(*uri))
319 .map(|(source_uri, _)| source_uri.clone())
320 .collect();
321 for source_uri in claimed_files {
322 if let Some(claim) = state.claims.remove(&source_uri) {
323 state.files.insert(
324 source_uri,
325 EntityFileState {
326 processed_at: processed_at.clone(),
327 size: claim.size,
328 mtime: claim.mtime,
329 },
330 );
331 }
332 }
333 state.updated_at = Some(processed_at);
334 })
335}
336
337pub fn promote_full_refresh_claimed_entity_state(
338 resolver: &StorageResolver,
339 cloud: &mut CloudClient,
340 entity_name: &str,
341 run_id: &str,
342 claimed: &ClaimedEntityState,
343) -> FloeResult<()> {
344 mutate_claimed_state(resolver, cloud, entity_name, claimed, |state, our_uris| {
345 let processed_at = now_rfc3339();
346 let claimed_files: Vec<String> = state
347 .claims
348 .iter()
349 .filter(|(uri, claim)| claim.run_id == run_id && our_uris.contains(*uri))
350 .map(|(uri, _)| uri.clone())
351 .collect();
352 state.files.clear();
353 for source_uri in claimed_files {
354 if let Some(claim) = state.claims.remove(&source_uri) {
355 state.files.insert(
356 source_uri,
357 EntityFileState {
358 processed_at: processed_at.clone(),
359 size: claim.size,
360 mtime: claim.mtime,
361 },
362 );
363 }
364 }
365 state.updated_at = Some(processed_at);
366 })
367}
368
369pub fn release_claimed_entity_state(
370 resolver: &StorageResolver,
371 cloud: &mut CloudClient,
372 entity_name: &str,
373 run_id: &str,
374 claimed: &ClaimedEntityState,
375) -> FloeResult<()> {
376 mutate_claimed_state(resolver, cloud, entity_name, claimed, |state, our_uris| {
377 state
378 .claims
379 .retain(|uri, claim| !(claim.run_id == run_id && our_uris.contains(uri)));
380 state.updated_at = Some(now_rfc3339());
381 })
382}
383
384pub fn renew_claimed_entity_state(
385 resolver: &StorageResolver,
386 cloud: &mut CloudClient,
387 entity_name: &str,
388 run_id: &str,
389 claimed: &ClaimedEntityState,
390) -> FloeResult<()> {
391 mutate_claimed_state(resolver, cloud, entity_name, claimed, |state, our_uris| {
392 let now = now_rfc3339();
393 let expires_at = rfc3339_after_seconds(CLAIM_TTL_SECONDS);
394 for (uri, claim) in state.claims.iter_mut() {
395 if claim.run_id == run_id && our_uris.contains(uri) {
396 claim.expires_at = expires_at.clone();
397 }
398 }
399 state.updated_at = Some(now);
400 })
401}
402
403fn mutate_claimed_state(
404 resolver: &StorageResolver,
405 cloud: &mut CloudClient,
406 entity_name: &str,
407 claimed: &ClaimedEntityState,
408 mutate: impl Fn(&mut EntityState, &HashSet<String>),
409) -> FloeResult<()> {
410 let our_uris: HashSet<String> = claimed.state.claims.keys().cloned().collect();
411 for attempt in 0..STATE_CAS_RETRIES {
412 let mut loaded = if attempt == 0 {
413 LoadedEntityState {
414 target: claimed.target.clone(),
415 state: claimed.state.clone(),
416 version: claimed.version.clone(),
417 existed: claimed.version.is_some(),
418 }
419 } else {
420 load_target_state_with_entity_name(
421 cloud,
422 resolver,
423 entity_name,
424 claimed.target.clone(),
425 )?
426 };
427 mutate(&mut loaded.state, &our_uris);
428 loaded.state.schema = ENTITY_STATE_SCHEMA_V2.to_string();
429 let persisted = if loaded.state.files.is_empty() && loaded.state.claims.is_empty() {
430 delete_loaded_state(cloud, resolver, &loaded)?
431 } else {
432 persist_loaded_state(cloud, resolver, &loaded)?
433 };
434 if persisted.is_some() {
435 return Ok(());
436 }
437 }
438 Err(Box::new(ConfigError(format!(
439 "entity.name={} incremental state update conflicted after {STATE_CAS_RETRIES} retries",
440 entity_name
441 ))))
442}
443
444pub fn inspect_entity_state_with_base(
445 config_path: &Path,
446 config_base: ConfigBase,
447 entity_name: &str,
448) -> FloeResult<EntityStateInspection> {
449 let config = crate::load_config(config_path)?;
450 inspect_entity_state(&config, config_base, entity_name)
451}
452
453pub fn inspect_entity_state(
454 config: &RootConfig,
455 config_base: ConfigBase,
456 entity_name: &str,
457) -> FloeResult<EntityStateInspection> {
458 let (entity, path) = resolve_entity_state_target(config, config_base.clone(), entity_name)?;
459 let resolver = StorageResolver::new(config, config_base)?;
460 let target = state_target_from_resolved(&path)?;
461 let mut cloud = CloudClient::new();
462 let loaded = load_target_state_with_resolver(&mut cloud, &resolver, entity, target)?;
463 let state = loaded.existed.then_some(loaded.state);
464
465 Ok(EntityStateInspection {
466 entity_name: entity.name.clone(),
467 incremental_mode: entity.incremental_mode,
468 path,
469 state,
470 })
471}
472
473pub fn reset_entity_state(
474 config: &RootConfig,
475 config_base: ConfigBase,
476 entity_name: &str,
477) -> FloeResult<bool> {
478 let (entity, path) = resolve_entity_state_target(config, config_base.clone(), entity_name)?;
479 let target = state_target_from_resolved(&path)?;
480 match target {
481 EntityStateTarget::Local { path, .. } => {
482 if path.exists() {
483 fs::remove_file(&path)?;
484 return Ok(true);
485 }
486 Ok(false)
487 }
488 EntityStateTarget::Remote { storage, uri } => {
489 let mut cloud = CloudClient::new();
490 let resolver = StorageResolver::new(config, config_base)?;
491 let client = cloud.client_for_context(
492 &resolver,
493 &storage,
494 &format!("entity.name={}", entity.name),
495 )?;
496 let Some(object) = client.read_object(&uri)? else {
497 return Ok(false);
498 };
499 match client.delete_object_conditional(&uri, Some(&object.version))? {
500 ConditionalWrite::Written { .. } => Ok(true),
501 ConditionalWrite::Conflict => Err(Box::new(ConfigError(format!(
502 "entity.name={} remote state changed while resetting: {}",
503 entity.name, uri
504 )))),
505 }
506 }
507 }
508}
509
510pub fn reset_entity_state_with_base(
511 config_path: &Path,
512 config_base: ConfigBase,
513 entity_name: &str,
514) -> FloeResult<bool> {
515 let config = crate::load_config(config_path)?;
516 reset_entity_state(&config, config_base, entity_name)
517}
518
519fn resolve_entity_state_target<'a>(
520 config: &'a RootConfig,
521 config_base: ConfigBase,
522 entity_name: &str,
523) -> FloeResult<(&'a EntityConfig, ResolvedPath)> {
524 let entity = config
525 .entities
526 .iter()
527 .find(|entity| entity.name == entity_name)
528 .ok_or_else(|| {
529 Box::new(ConfigError(format!("entity not found: {entity_name}")))
530 as Box<dyn std::error::Error + Send + Sync>
531 })?;
532 let resolver = StorageResolver::new(config, config_base)?;
533 let path = resolve_entity_state_path(&resolver, entity)?;
534 Ok((entity, path))
535}
536
537pub fn write_entity_state_atomic(path: &Path, state: &EntityState) -> FloeResult<()> {
538 let parent = path.parent().ok_or_else(|| {
539 Box::new(ConfigError(format!(
540 "state path has no parent directory: {}",
541 path.display()
542 ))) as Box<dyn std::error::Error + Send + Sync>
543 })?;
544 fs::create_dir_all(parent)?;
545
546 let mut temp = NamedTempFile::new_in(parent)?;
547 serde_json::to_writer_pretty(temp.as_file_mut(), state)?;
548 temp.as_file_mut().sync_all()?;
549 temp.persist(path).map_err(|err| err.error)?;
550 Ok(())
551}
552
553fn load_entity_state(
554 resolver: &StorageResolver,
555 cloud: &mut CloudClient,
556 entity: &EntityConfig,
557) -> FloeResult<LoadedEntityState> {
558 let resolved = resolve_entity_state_path(resolver, entity)?;
559 let target = state_target_from_resolved(&resolved)?;
560 load_target_state_with_resolver(cloud, resolver, entity, target)
561}
562
563fn load_target_state_with_resolver(
564 cloud: &mut CloudClient,
565 resolver: &StorageResolver,
566 entity: &EntityConfig,
567 target: EntityStateTarget,
568) -> FloeResult<LoadedEntityState> {
569 load_target_state_with_entity_name(cloud, resolver, &entity.name, target)
570}
571
572fn load_target_state_with_entity_name(
573 cloud: &mut CloudClient,
574 resolver: &StorageResolver,
575 entity_name: &str,
576 target: EntityStateTarget,
577) -> FloeResult<LoadedEntityState> {
578 match target {
579 EntityStateTarget::Local { path, uri } => {
580 let object = LocalClient::new().read_object(&uri)?;
581 let (state, version, existed) = resolve_loaded_state(entity_name, object)?;
582 Ok(LoadedEntityState {
583 target: EntityStateTarget::Local { path, uri },
584 state,
585 version,
586 existed,
587 })
588 }
589 EntityStateTarget::Remote { storage, uri } => {
590 let client = cloud.client_for_context(
591 resolver,
592 &storage,
593 &format!("entity.name={entity_name}"),
594 )?;
595 let object = client.read_object(&uri)?;
596 let (state, version, existed) = resolve_loaded_state(entity_name, object)?;
597 Ok(LoadedEntityState {
598 target: EntityStateTarget::Remote { storage, uri },
599 state,
600 version,
601 existed,
602 })
603 }
604 }
605}
606
607fn resolve_loaded_state(
608 entity_name: &str,
609 object: Option<StoredObject>,
610) -> FloeResult<(EntityState, Option<String>, bool)> {
611 match object {
612 Some(object) => Ok((
613 validate_entity_state_name(entity_name, parse_entity_state(&object.body)?)?,
614 Some(object.version),
615 true,
616 )),
617 None => Ok((EntityState::new(entity_name), None, false)),
618 }
619}
620
621fn with_state_client<R, F>(
622 cloud: &mut CloudClient,
623 resolver: &StorageResolver,
624 target: &EntityStateTarget,
625 f: F,
626) -> FloeResult<R>
627where
628 F: FnOnce(&str, &dyn StorageClient) -> FloeResult<R>,
629{
630 match target {
631 EntityStateTarget::Local { uri, .. } => f(uri, &LocalClient::new()),
632 EntityStateTarget::Remote { uri, storage } => {
633 let client = cloud.client_for_context(resolver, storage, "entity state")?;
634 f(uri, client)
635 }
636 }
637}
638
639fn conditional_write_to_version(cw: ConditionalWrite) -> Option<Option<String>> {
640 match cw {
641 ConditionalWrite::Written { version } => Some(Some(version)),
642 ConditionalWrite::Conflict => None,
643 }
644}
645
646fn persist_loaded_state(
647 cloud: &mut CloudClient,
648 resolver: &StorageResolver,
649 loaded: &LoadedEntityState,
650) -> FloeResult<Option<Option<String>>> {
651 let mut state = loaded.state.clone();
652 state.schema = ENTITY_STATE_SCHEMA_V2.to_string();
653 let body = serde_json::to_vec_pretty(&state)?;
654 let version = loaded.version.as_deref();
655 let cw = with_state_client(cloud, resolver, &loaded.target, |uri, client| {
656 client.write_object_conditional(uri, version, &body)
657 })?;
658 Ok(conditional_write_to_version(cw))
659}
660
661fn delete_loaded_state(
662 cloud: &mut CloudClient,
663 resolver: &StorageResolver,
664 loaded: &LoadedEntityState,
665) -> FloeResult<Option<Option<String>>> {
666 let version = loaded.version.as_deref();
667 let cw = with_state_client(cloud, resolver, &loaded.target, |uri, client| {
668 client.delete_object_conditional(uri, version)
669 })?;
670 Ok(conditional_write_to_version(cw))
671}
672
673fn state_target_from_resolved(resolved: &ResolvedPath) -> FloeResult<EntityStateTarget> {
674 if let Some(path) = &resolved.local_path {
675 return Ok(EntityStateTarget::Local {
676 path: path.clone(),
677 uri: resolved.uri.clone(),
678 });
679 }
680 if is_remote_uri(&resolved.uri) {
681 return Ok(EntityStateTarget::Remote {
682 storage: resolved.storage.clone(),
683 uri: resolved.uri.clone(),
684 });
685 }
686 Err(Box::new(ConfigError(format!(
687 "state path is neither local nor supported remote: {}",
688 resolved.uri
689 ))))
690}
691
692fn remove_expired_claims(state: &mut EntityState) {
693 let now = time::OffsetDateTime::now_utc();
694 state.claims.retain(|_, claim| {
695 time::OffsetDateTime::parse(
696 &claim.expires_at,
697 &time::format_description::well_known::Rfc3339,
698 )
699 .map(|expires_at| expires_at > now)
700 .unwrap_or(false)
701 });
702}
703
704fn rfc3339_offset(seconds: i64) -> String {
705 (time::OffsetDateTime::now_utc() + time::Duration::seconds(seconds))
706 .format(&time::format_description::well_known::Rfc3339)
707 .unwrap_or_else(|_| crate::report::now_rfc3339())
708}
709
710fn now_rfc3339() -> String {
711 rfc3339_offset(0)
712}
713
714fn rfc3339_after_seconds(seconds: i64) -> String {
715 rfc3339_offset(seconds)
716}
717
718fn join_state_path(source_root: &str, entity_name: &str) -> String {
719 if source_root.is_empty() || source_root == "." {
720 format!(".floe/state/{entity_name}/{ENTITY_STATE_FILENAME}")
721 } else {
722 format!(
723 "{}/.floe/state/{entity_name}/{ENTITY_STATE_FILENAME}",
724 source_root.trim_end_matches(is_path_separator)
725 )
726 }
727}
728
729fn derive_source_root(
730 raw_path: &str,
731 source_format: &str,
732 resolved_local_path: Option<&Path>,
733) -> String {
734 let trimmed = raw_path.trim_end_matches(is_path_separator);
735 if trimmed.is_empty() {
736 return String::new();
737 }
738
739 if let Some(prefix) = prefix_before_first_glob(trimmed) {
740 if prefix.is_empty() {
741 return String::new();
742 }
743 if prefix.ends_with(is_path_separator) {
744 return prefix.trim_end_matches(is_path_separator).to_string();
745 }
746 return parent_like(prefix)
747 .unwrap_or(prefix)
748 .trim_end_matches(is_path_separator)
749 .to_string();
750 }
751
752 if let Some(path) = resolved_local_path.filter(|path| path.exists()) {
753 if path.is_dir() {
754 return trimmed.to_string();
755 }
756 if path.is_file() {
757 return parent_like(trimmed)
758 .unwrap_or(trimmed)
759 .trim_end_matches(is_path_separator)
760 .to_string();
761 }
762 }
763
764 if matches_source_file_suffix(trimmed, source_format) {
765 return parent_like(trimmed)
766 .unwrap_or(trimmed)
767 .trim_end_matches(is_path_separator)
768 .to_string();
769 }
770
771 trimmed.to_string()
772}
773
774fn prefix_before_first_glob(value: &str) -> Option<&str> {
775 let index = value.find(['*', '?', '['])?;
776 Some(&value[..index])
777}
778
779fn matches_source_file_suffix(value: &str, source_format: &str) -> bool {
780 let Some(segment) = value.rsplit(is_path_separator).next() else {
781 return false;
782 };
783 let segment = segment.to_ascii_lowercase();
784
785 extensions::suffixes_for_format(source_format)
786 .map(|suffixes| {
787 suffixes
788 .iter()
789 .any(|suffix| segment.ends_with(&suffix.to_ascii_lowercase()))
790 })
791 .unwrap_or(false)
792}
793
794fn parent_like(value: &str) -> Option<&str> {
795 value.rfind(is_path_separator).map(|index| {
796 if index == 0 {
797 &value[..1]
798 } else {
799 &value[..index]
800 }
801 })
802}
803
804fn is_path_separator(ch: char) -> bool {
805 ch == '/' || ch == '\\'
806}
807
808fn is_remote_uri(value: &str) -> bool {
809 value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
810}
811
812pub fn validate_entity_state(entity: &EntityConfig, state: EntityState) -> FloeResult<EntityState> {
813 validate_entity_state_name(&entity.name, state)
814}
815
816fn validate_entity_state_name(entity_name: &str, state: EntityState) -> FloeResult<EntityState> {
817 if state.schema != ENTITY_STATE_SCHEMA_V1 && state.schema != ENTITY_STATE_SCHEMA_V2 {
818 return Err(Box::new(ConfigError(format!(
819 "entity.name={} state schema mismatch: expected {} or {}, got {}",
820 entity_name, ENTITY_STATE_SCHEMA_V1, ENTITY_STATE_SCHEMA_V2, state.schema
821 ))));
822 }
823
824 if state.entity != entity_name {
825 return Err(Box::new(ConfigError(format!(
826 "entity.name={} state entity mismatch: expected {}, got {}",
827 entity_name, entity_name, state.entity
828 ))));
829 }
830
831 Ok(state)
832}