Skip to main content

floe_core/config/
storage.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::config::{RootConfig, StorageDefinition};
5use crate::{ConfigError, FloeResult};
6
7#[derive(Debug, Clone)]
8pub struct ConfigBase {
9    local_dir: PathBuf,
10    remote_base: Option<RemoteConfigBase>,
11}
12
13impl ConfigBase {
14    pub fn local_from_path(path: &Path) -> Self {
15        let local_dir = path
16            .parent()
17            .unwrap_or_else(|| Path::new("."))
18            .to_path_buf();
19        Self {
20            local_dir,
21            remote_base: None,
22        }
23    }
24
25    pub fn remote_from_uri(local_dir: PathBuf, uri: &str) -> FloeResult<Self> {
26        Ok(Self {
27            local_dir,
28            remote_base: Some(RemoteConfigBase::from_uri(uri)?),
29        })
30    }
31
32    pub fn local_dir(&self) -> &Path {
33        &self.local_dir
34    }
35
36    pub fn remote_base(&self) -> Option<&RemoteConfigBase> {
37        self.remote_base.as_ref()
38    }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum RemoteScheme {
43    S3,
44    Gcs,
45    Adls,
46}
47
48#[derive(Debug, Clone)]
49pub struct RemoteConfigBase {
50    scheme: RemoteScheme,
51    bucket: String,
52    account: Option<String>,
53    container: Option<String>,
54    prefix: String,
55}
56
57impl RemoteConfigBase {
58    fn from_uri(uri: &str) -> FloeResult<Self> {
59        if let Some((bucket, key)) = parse_s3_uri(uri) {
60            let prefix = parent_prefix(&key);
61            return Ok(Self {
62                scheme: RemoteScheme::S3,
63                bucket,
64                account: None,
65                container: None,
66                prefix,
67            });
68        }
69        if let Some((bucket, key)) = parse_gcs_uri(uri) {
70            let prefix = parent_prefix(&key);
71            return Ok(Self {
72                scheme: RemoteScheme::Gcs,
73                bucket,
74                account: None,
75                container: None,
76                prefix,
77            });
78        }
79        if let Some((container, account, path)) = parse_adls_uri(uri) {
80            let prefix = parent_prefix(&path);
81            return Ok(Self {
82                scheme: RemoteScheme::Adls,
83                bucket: String::new(),
84                account: Some(account),
85                container: Some(container),
86                prefix,
87            });
88        }
89        Err(Box::new(ConfigError(format!(
90            "unsupported config uri: {}",
91            uri
92        ))))
93    }
94
95    fn matches_storage(&self, storage_type: &str) -> bool {
96        matches!(
97            (self.scheme, storage_type),
98            (RemoteScheme::S3, "s3") | (RemoteScheme::Gcs, "gcs") | (RemoteScheme::Adls, "adls")
99        )
100    }
101
102    fn join(&self, relative: &str) -> String {
103        match self.scheme {
104            RemoteScheme::S3 => {
105                let key = join_s3_key(&self.prefix, relative);
106                format_s3_uri(&self.bucket, &key)
107            }
108            RemoteScheme::Gcs => {
109                let key = join_s3_key(&self.prefix, relative);
110                format_gcs_uri(&self.bucket, &key)
111            }
112            RemoteScheme::Adls => {
113                let combined = join_adls_path(&self.prefix, relative);
114                let container = self.container.as_ref().expect("container");
115                let account = self.account.as_ref().expect("account");
116                format_adls_uri(container, account, &combined)
117            }
118        }
119    }
120}
121
122#[derive(Debug, Clone)]
123pub struct ResolvedPath {
124    pub storage: String,
125    pub uri: String,
126    pub local_path: Option<PathBuf>,
127}
128
129#[derive(Clone)]
130pub struct StorageResolver {
131    config_base: ConfigBase,
132    default_name: String,
133    definitions: HashMap<String, StorageDefinition>,
134    has_config: bool,
135}
136
137impl StorageResolver {
138    pub fn config_local_dir(&self) -> &Path {
139        self.config_base.local_dir()
140    }
141
142    pub fn config_is_remote(&self) -> bool {
143        self.config_base.remote_base().is_some()
144    }
145
146    pub fn resolve_local_path(&self, raw_path: &str) -> FloeResult<ResolvedPath> {
147        if is_remote_uri(raw_path) {
148            return Err(Box::new(ConfigError(format!(
149                "entity.state.path must be a local path (got {})",
150                raw_path
151            ))));
152        }
153        if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
154            return Err(Box::new(ConfigError(
155                "entity.state.path must be absolute when config is remote".to_string(),
156            )));
157        }
158        let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
159        Ok(ResolvedPath {
160            storage: "local".to_string(),
161            uri: local_uri(&resolved),
162            local_path: Some(resolved),
163        })
164    }
165
166    pub fn new(config: &RootConfig, config_base: ConfigBase) -> FloeResult<Self> {
167        if let Some(storages) = &config.storages {
168            let mut definitions = HashMap::new();
169            for definition in &storages.definitions {
170                if definitions
171                    .insert(definition.name.clone(), definition.clone())
172                    .is_some()
173                {
174                    return Err(Box::new(ConfigError(format!(
175                        "storages.definitions name={} is duplicated",
176                        definition.name
177                    ))));
178                }
179            }
180            let default_name = storages
181                .default
182                .clone()
183                .ok_or_else(|| Box::new(ConfigError("storages.default is required".to_string())))?;
184            if !definitions.contains_key(&default_name) {
185                return Err(Box::new(ConfigError(format!(
186                    "storages.default={} does not match any definition",
187                    default_name
188                ))));
189            }
190            Ok(Self {
191                config_base,
192                default_name,
193                definitions,
194                has_config: true,
195            })
196        } else {
197            Ok(Self {
198                config_base,
199                default_name: "local".to_string(),
200                definitions: HashMap::new(),
201                has_config: false,
202            })
203        }
204    }
205
206    pub fn from_path(config: &RootConfig, config_path: &Path) -> FloeResult<Self> {
207        Self::new(config, ConfigBase::local_from_path(config_path))
208    }
209
210    pub fn resolve_path(
211        &self,
212        entity_name: &str,
213        field: &str,
214        storage_name: Option<&str>,
215        raw_path: &str,
216    ) -> FloeResult<ResolvedPath> {
217        let name = storage_name.unwrap_or(self.default_name.as_str());
218        if !self.has_config && name != "local" && !self.definitions.contains_key(name) {
219            return Err(Box::new(ConfigError(format!(
220                "entity.name={} {field} references unknown storage {} (no storages block)",
221                entity_name, name
222            ))));
223        }
224
225        let definition = if self.has_config {
226            self.definitions.get(name).cloned().ok_or_else(|| {
227                Box::new(ConfigError(format!(
228                    "entity.name={} {field} references unknown storage {}",
229                    entity_name, name
230                )))
231            })?
232        } else {
233            StorageDefinition {
234                name: "local".to_string(),
235                fs_type: "local".to_string(),
236                bucket: None,
237                region: None,
238                account: None,
239                container: None,
240                prefix: None,
241                endpoint: None,
242                path_style_access: None,
243            }
244        };
245
246        let resolved_remote = self
247            .resolve_remote_relative(&definition, raw_path)
248            .unwrap_or_else(|| raw_path.to_string());
249        let raw_path = resolved_remote.as_str();
250
251        match definition.fs_type.as_str() {
252            "local" => {
253                if is_remote_uri(raw_path) {
254                    return Err(Box::new(ConfigError(format!(
255                        "entity.name={} {field} must be a local path (got {})",
256                        entity_name, raw_path
257                    ))));
258                }
259                if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
260                    return Err(Box::new(ConfigError(format!(
261                        "entity.name={} {field} must be absolute when config is remote",
262                        entity_name
263                    ))));
264                }
265                let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
266                Ok(ResolvedPath {
267                    storage: name.to_string(),
268                    uri: local_uri(&resolved),
269                    local_path: Some(resolved),
270                })
271            }
272            "s3" => {
273                let uri = resolve_s3_uri(&definition, raw_path)?;
274                Ok(ResolvedPath {
275                    storage: name.to_string(),
276                    uri,
277                    local_path: None,
278                })
279            }
280            "adls" => {
281                let uri = resolve_adls_uri(&definition, raw_path)?;
282                Ok(ResolvedPath {
283                    storage: name.to_string(),
284                    uri,
285                    local_path: None,
286                })
287            }
288            "gcs" => {
289                let uri = resolve_gcs_uri(&definition, raw_path)?;
290                Ok(ResolvedPath {
291                    storage: name.to_string(),
292                    uri,
293                    local_path: None,
294                })
295            }
296            _ => Err(Box::new(ConfigError(format!(
297                "storage type {} is unsupported",
298                definition.fs_type
299            )))),
300        }
301    }
302
303    pub fn resolve_report_path(
304        &self,
305        storage_name: Option<&str>,
306        raw_path: &str,
307    ) -> FloeResult<ResolvedPath> {
308        let name = storage_name.unwrap_or(self.default_name.as_str());
309        if !self.has_config && name != "local" && !self.definitions.contains_key(name) {
310            return Err(Box::new(ConfigError(format!(
311                "report.storage references unknown storage {} (no storages block)",
312                name
313            ))));
314        }
315
316        let definition = if self.has_config {
317            self.definitions.get(name).cloned().ok_or_else(|| {
318                Box::new(ConfigError(format!(
319                    "report.storage references unknown storage {}",
320                    name
321                )))
322            })?
323        } else {
324            StorageDefinition {
325                name: "local".to_string(),
326                fs_type: "local".to_string(),
327                bucket: None,
328                region: None,
329                account: None,
330                container: None,
331                prefix: None,
332                endpoint: None,
333                path_style_access: None,
334            }
335        };
336
337        let resolved_remote = self
338            .resolve_remote_relative(&definition, raw_path)
339            .unwrap_or_else(|| raw_path.to_string());
340        let raw_path = resolved_remote.as_str();
341
342        match definition.fs_type.as_str() {
343            "local" => {
344                if is_remote_uri(raw_path) {
345                    return Err(Box::new(ConfigError(format!(
346                        "report.path must be a local path (got {})",
347                        raw_path
348                    ))));
349                }
350                if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
351                    return Err(Box::new(ConfigError(
352                        "report.path must be absolute when config is remote".to_string(),
353                    )));
354                }
355                let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
356                Ok(ResolvedPath {
357                    storage: name.to_string(),
358                    uri: local_uri(&resolved),
359                    local_path: Some(resolved),
360                })
361            }
362            "s3" => {
363                let uri = resolve_s3_uri(&definition, raw_path)?;
364                Ok(ResolvedPath {
365                    storage: name.to_string(),
366                    uri,
367                    local_path: None,
368                })
369            }
370            "adls" => {
371                let uri = resolve_adls_uri(&definition, raw_path)?;
372                Ok(ResolvedPath {
373                    storage: name.to_string(),
374                    uri,
375                    local_path: None,
376                })
377            }
378            "gcs" => {
379                let uri = resolve_gcs_uri(&definition, raw_path)?;
380                Ok(ResolvedPath {
381                    storage: name.to_string(),
382                    uri,
383                    local_path: None,
384                })
385            }
386            _ => Err(Box::new(ConfigError(format!(
387                "storage type {} is unsupported",
388                definition.fs_type
389            )))),
390        }
391    }
392
393    /// Scan definitions for the first one whose scheme and bucket/account match `uri`.
394    /// Used in manifest mode to resolve a bare report URI back to a named definition.
395    pub fn find_definition_name_for_uri(&self, uri: &str) -> Option<String> {
396        for (name, def) in &self.definitions {
397            if uri.starts_with("s3://") && def.fs_type == "s3" {
398                if let Some(b) = &def.bucket {
399                    if uri.starts_with(&format!("s3://{b}/")) || uri == format!("s3://{b}") {
400                        return Some(name.clone());
401                    }
402                }
403            }
404            if uri.starts_with("gs://") && def.fs_type == "gcs" {
405                if let Some(b) = &def.bucket {
406                    if uri.starts_with(&format!("gs://{b}/")) || uri == format!("gs://{b}") {
407                        return Some(name.clone());
408                    }
409                }
410            }
411            if uri.starts_with("abfs://") && def.fs_type == "adls" {
412                if let (Some(c), Some(a)) = (&def.container, &def.account) {
413                    if uri.starts_with(&format!("abfs://{c}@{a}.dfs.core.windows.net")) {
414                        return Some(name.clone());
415                    }
416                }
417            }
418        }
419        None
420    }
421
422    /// Register a synthetic `StorageDefinition` into this resolver.
423    /// Used in manifest mode when the report URI has no matching definition in the config.
424    /// Does NOT flip `has_config`; entity resolution keeps its implicit-local fallback.
425    pub fn register_definition(&mut self, definition: StorageDefinition) {
426        self.definitions.insert(definition.name.clone(), definition);
427    }
428
429    pub fn definition(&self, name: &str) -> Option<StorageDefinition> {
430        if self.has_config {
431            self.definitions.get(name).cloned()
432        } else if let Some(def) = self.definitions.get(name) {
433            // Synthetic definition registered by register_definition (e.g. report target).
434            Some(def.clone())
435        } else if name == "local" {
436            Some(StorageDefinition {
437                name: "local".to_string(),
438                fs_type: "local".to_string(),
439                bucket: None,
440                region: None,
441                account: None,
442                container: None,
443                prefix: None,
444                endpoint: None,
445                path_style_access: None,
446            })
447        } else {
448            None
449        }
450    }
451
452    pub fn default_storage_name(&self) -> &str {
453        self.default_name.as_str()
454    }
455
456    pub fn config_dir(&self) -> &Path {
457        self.config_base.local_dir()
458    }
459
460    fn resolve_remote_relative(
461        &self,
462        definition: &StorageDefinition,
463        raw_path: &str,
464    ) -> Option<String> {
465        if !is_relative_path(raw_path) {
466            return None;
467        }
468        if definition.prefix.is_some() {
469            return None;
470        }
471        let remote = self.config_base.remote_base()?;
472        if !remote.matches_storage(definition.fs_type.as_str()) {
473            return None;
474        }
475        Some(remote.join(raw_path))
476    }
477}
478
479pub fn resolve_local_path(config_dir: &Path, raw_path: &str) -> PathBuf {
480    let path = Path::new(raw_path);
481    let resolved = if path.is_absolute() {
482        path.to_path_buf()
483    } else {
484        config_dir.join(path)
485    };
486    crate::io::storage::paths::normalize_local_path(&resolved)
487}
488
489fn local_uri(path: &Path) -> String {
490    let normalized = crate::io::storage::paths::normalize_local_path(path);
491    format!("local://{}", normalized.display())
492}
493
494fn resolve_s3_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
495    let bucket = definition.bucket.as_ref().ok_or_else(|| {
496        Box::new(ConfigError(format!(
497            "storage {} requires bucket for type s3",
498            definition.name
499        )))
500    })?;
501    if let Some((bucket_in_path, key)) = parse_s3_uri(raw_path) {
502        if bucket_in_path != *bucket {
503            return Err(Box::new(ConfigError(format!(
504                "storage {} bucket mismatch: {}",
505                definition.name, bucket_in_path
506            ))));
507        }
508        return Ok(format_s3_uri(bucket, &key));
509    }
510
511    let key = join_s3_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
512    Ok(format_s3_uri(bucket, &key))
513}
514
515fn resolve_adls_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
516    let account = definition.account.as_ref().ok_or_else(|| {
517        Box::new(ConfigError(format!(
518            "storage {} requires account for type adls",
519            definition.name
520        )))
521    })?;
522    let container = definition.container.as_ref().ok_or_else(|| {
523        Box::new(ConfigError(format!(
524            "storage {} requires container for type adls",
525            definition.name
526        )))
527    })?;
528    if let Some((container_in_path, account_in_path, path)) = parse_adls_uri(raw_path) {
529        if container_in_path != *container || account_in_path != *account {
530            return Err(Box::new(ConfigError(format!(
531                "storage {} adls account/container mismatch",
532                definition.name
533            ))));
534        }
535        return Ok(format_adls_uri(container, account, &path));
536    }
537    let prefix = definition.prefix.as_deref().unwrap_or("");
538    let combined = join_adls_path(prefix, raw_path);
539    Ok(format_adls_uri(container, account, &combined))
540}
541
542fn join_adls_path(prefix: &str, raw_path: &str) -> String {
543    let prefix = prefix.trim_matches('/');
544    let trimmed = raw_path.trim_start_matches('/');
545    match (prefix.is_empty(), trimmed.is_empty()) {
546        (true, true) => String::new(),
547        (true, false) => trimmed.to_string(),
548        (false, true) => prefix.to_string(),
549        (false, false) => format!("{}/{}", prefix, trimmed),
550    }
551}
552
553fn format_adls_uri(container: &str, account: &str, path: &str) -> String {
554    if path.is_empty() {
555        format!("abfs://{}@{}.dfs.core.windows.net", container, account)
556    } else {
557        format!(
558            "abfs://{}@{}.dfs.core.windows.net/{}",
559            container, account, path
560        )
561    }
562}
563
564fn parse_s3_uri(value: &str) -> Option<(String, String)> {
565    let stripped = value.strip_prefix("s3://")?;
566    let mut parts = stripped.splitn(2, '/');
567    let bucket = parts.next()?.to_string();
568    if bucket.is_empty() {
569        return None;
570    }
571    let key = parts.next().unwrap_or("").to_string();
572    Some((bucket, key))
573}
574
575fn join_s3_key(prefix: &str, raw_path: &str) -> String {
576    let prefix = prefix.trim_matches('/');
577    let trimmed = raw_path.trim_start_matches('/');
578    match (prefix.is_empty(), trimmed.is_empty()) {
579        (true, true) => String::new(),
580        (true, false) => trimmed.to_string(),
581        (false, true) => prefix.to_string(),
582        (false, false) => format!("{}/{}", prefix, trimmed),
583    }
584}
585
586fn format_s3_uri(bucket: &str, key: &str) -> String {
587    if key.is_empty() {
588        format!("s3://{}", bucket)
589    } else {
590        format!("s3://{}/{}", bucket, key)
591    }
592}
593
594fn resolve_gcs_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
595    let bucket = definition.bucket.as_ref().ok_or_else(|| {
596        Box::new(ConfigError(format!(
597            "storage {} requires bucket for type gcs",
598            definition.name
599        )))
600    })?;
601    if let Some((bucket_in_path, key)) = parse_gcs_uri(raw_path) {
602        if bucket_in_path != *bucket {
603            return Err(Box::new(ConfigError(format!(
604                "storage {} bucket mismatch: {}",
605                definition.name, bucket_in_path
606            ))));
607        }
608        return Ok(format_gcs_uri(bucket, &key));
609    }
610
611    let key = join_gcs_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
612    Ok(format_gcs_uri(bucket, &key))
613}
614
615fn parse_gcs_uri(value: &str) -> Option<(String, String)> {
616    let stripped = value.strip_prefix("gs://")?;
617    let mut parts = stripped.splitn(2, '/');
618    let bucket = parts.next()?.to_string();
619    if bucket.is_empty() {
620        return None;
621    }
622    let key = parts.next().unwrap_or("").to_string();
623    Some((bucket, key))
624}
625
626fn join_gcs_key(prefix: &str, raw_path: &str) -> String {
627    let prefix = prefix.trim_matches('/');
628    let trimmed = raw_path.trim_start_matches('/');
629    match (prefix.is_empty(), trimmed.is_empty()) {
630        (true, true) => String::new(),
631        (true, false) => trimmed.to_string(),
632        (false, true) => prefix.to_string(),
633        (false, false) => format!("{}/{}", prefix, trimmed),
634    }
635}
636
637fn format_gcs_uri(bucket: &str, key: &str) -> String {
638    if key.is_empty() {
639        format!("gs://{}", bucket)
640    } else {
641        format!("gs://{}/{}", bucket, key)
642    }
643}
644
645fn parse_adls_uri(value: &str) -> Option<(String, String, String)> {
646    let stripped = value.strip_prefix("abfs://")?;
647    let (container, rest) = stripped.split_once('@')?;
648    let (account, path) = rest.split_once(".dfs.core.windows.net")?;
649    let path = path.trim_start_matches('/').to_string();
650    if container.is_empty() || account.is_empty() {
651        return None;
652    }
653    Some((container.to_string(), account.to_string(), path))
654}
655
656fn parent_prefix(key: &str) -> String {
657    match key.rsplit_once('/') {
658        Some((parent, _)) => parent.to_string(),
659        None => String::new(),
660    }
661}
662
663pub(crate) fn is_remote_uri(value: &str) -> bool {
664    value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
665}
666
667fn is_relative_path(value: &str) -> bool {
668    !is_remote_uri(value) && Path::new(value).is_relative()
669}