Skip to main content

floe_core/config/
storage.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::config::{RootConfig, StorageDefinition};
5use crate::{ConfigError, FloeResult};
6
7#[derive(Debug, Clone)]
8pub struct ConfigBase {
9    local_dir: PathBuf,
10    remote_base: Option<RemoteConfigBase>,
11}
12
13impl ConfigBase {
14    pub fn local_from_path(path: &Path) -> Self {
15        let local_dir = path
16            .parent()
17            .unwrap_or_else(|| Path::new("."))
18            .to_path_buf();
19        Self {
20            local_dir,
21            remote_base: None,
22        }
23    }
24
25    pub fn remote_from_uri(local_dir: PathBuf, uri: &str) -> FloeResult<Self> {
26        Ok(Self {
27            local_dir,
28            remote_base: Some(RemoteConfigBase::from_uri(uri)?),
29        })
30    }
31
32    pub fn local_dir(&self) -> &Path {
33        &self.local_dir
34    }
35
36    pub fn remote_base(&self) -> Option<&RemoteConfigBase> {
37        self.remote_base.as_ref()
38    }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum RemoteScheme {
43    S3,
44    Gcs,
45    Adls,
46}
47
48#[derive(Debug, Clone)]
49pub struct RemoteConfigBase {
50    scheme: RemoteScheme,
51    bucket: String,
52    account: Option<String>,
53    container: Option<String>,
54    prefix: String,
55}
56
57impl RemoteConfigBase {
58    fn from_uri(uri: &str) -> FloeResult<Self> {
59        if let Some((bucket, key)) = parse_s3_uri(uri) {
60            let prefix = parent_prefix(&key);
61            return Ok(Self {
62                scheme: RemoteScheme::S3,
63                bucket,
64                account: None,
65                container: None,
66                prefix,
67            });
68        }
69        if let Some((bucket, key)) = parse_gcs_uri(uri) {
70            let prefix = parent_prefix(&key);
71            return Ok(Self {
72                scheme: RemoteScheme::Gcs,
73                bucket,
74                account: None,
75                container: None,
76                prefix,
77            });
78        }
79        if let Some((container, account, path)) = parse_adls_uri(uri) {
80            let prefix = parent_prefix(&path);
81            return Ok(Self {
82                scheme: RemoteScheme::Adls,
83                bucket: String::new(),
84                account: Some(account),
85                container: Some(container),
86                prefix,
87            });
88        }
89        Err(Box::new(ConfigError(format!(
90            "unsupported config uri: {}",
91            uri
92        ))))
93    }
94
95    fn matches_storage(&self, storage_type: &str) -> bool {
96        matches!(
97            (self.scheme, storage_type),
98            (RemoteScheme::S3, "s3") | (RemoteScheme::Gcs, "gcs") | (RemoteScheme::Adls, "adls")
99        )
100    }
101
102    fn join(&self, relative: &str) -> String {
103        match self.scheme {
104            RemoteScheme::S3 => {
105                let key = join_s3_key(&self.prefix, relative);
106                format_s3_uri(&self.bucket, &key)
107            }
108            RemoteScheme::Gcs => {
109                let key = join_s3_key(&self.prefix, relative);
110                format_gcs_uri(&self.bucket, &key)
111            }
112            RemoteScheme::Adls => {
113                let combined = join_adls_path(&self.prefix, relative);
114                let container = self.container.as_ref().expect("container");
115                let account = self.account.as_ref().expect("account");
116                format_adls_uri(container, account, &combined)
117            }
118        }
119    }
120}
121
122#[derive(Debug, Clone)]
123pub struct ResolvedPath {
124    pub storage: String,
125    pub uri: String,
126    pub local_path: Option<PathBuf>,
127}
128
129pub struct StorageResolver {
130    config_base: ConfigBase,
131    default_name: String,
132    definitions: HashMap<String, StorageDefinition>,
133    has_config: bool,
134}
135
136impl StorageResolver {
137    pub fn config_local_dir(&self) -> &Path {
138        self.config_base.local_dir()
139    }
140
141    pub fn config_is_remote(&self) -> bool {
142        self.config_base.remote_base().is_some()
143    }
144
145    pub fn resolve_local_path(&self, raw_path: &str) -> FloeResult<ResolvedPath> {
146        if is_remote_uri(raw_path) {
147            return Err(Box::new(ConfigError(format!(
148                "entity.state.path must be a local path (got {})",
149                raw_path
150            ))));
151        }
152        if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
153            return Err(Box::new(ConfigError(
154                "entity.state.path must be absolute when config is remote".to_string(),
155            )));
156        }
157        let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
158        Ok(ResolvedPath {
159            storage: "local".to_string(),
160            uri: local_uri(&resolved),
161            local_path: Some(resolved),
162        })
163    }
164
165    pub fn new(config: &RootConfig, config_base: ConfigBase) -> FloeResult<Self> {
166        if let Some(storages) = &config.storages {
167            let mut definitions = HashMap::new();
168            for definition in &storages.definitions {
169                if definitions
170                    .insert(definition.name.clone(), definition.clone())
171                    .is_some()
172                {
173                    return Err(Box::new(ConfigError(format!(
174                        "storages.definitions name={} is duplicated",
175                        definition.name
176                    ))));
177                }
178            }
179            let default_name = storages
180                .default
181                .clone()
182                .ok_or_else(|| Box::new(ConfigError("storages.default is required".to_string())))?;
183            if !definitions.contains_key(&default_name) {
184                return Err(Box::new(ConfigError(format!(
185                    "storages.default={} does not match any definition",
186                    default_name
187                ))));
188            }
189            Ok(Self {
190                config_base,
191                default_name,
192                definitions,
193                has_config: true,
194            })
195        } else {
196            Ok(Self {
197                config_base,
198                default_name: "local".to_string(),
199                definitions: HashMap::new(),
200                has_config: false,
201            })
202        }
203    }
204
205    pub fn from_path(config: &RootConfig, config_path: &Path) -> FloeResult<Self> {
206        Self::new(config, ConfigBase::local_from_path(config_path))
207    }
208
209    pub fn resolve_path(
210        &self,
211        entity_name: &str,
212        field: &str,
213        storage_name: Option<&str>,
214        raw_path: &str,
215    ) -> FloeResult<ResolvedPath> {
216        let name = storage_name.unwrap_or(self.default_name.as_str());
217        if !self.has_config && name != "local" {
218            return Err(Box::new(ConfigError(format!(
219                "entity.name={} {field} references unknown storage {} (no storages block)",
220                entity_name, name
221            ))));
222        }
223
224        let definition = if self.has_config {
225            self.definitions.get(name).cloned().ok_or_else(|| {
226                Box::new(ConfigError(format!(
227                    "entity.name={} {field} references unknown storage {}",
228                    entity_name, name
229                )))
230            })?
231        } else {
232            StorageDefinition {
233                name: "local".to_string(),
234                fs_type: "local".to_string(),
235                bucket: None,
236                region: None,
237                account: None,
238                container: None,
239                prefix: None,
240            }
241        };
242
243        let resolved_remote = self
244            .resolve_remote_relative(&definition, raw_path)
245            .unwrap_or_else(|| raw_path.to_string());
246        let raw_path = resolved_remote.as_str();
247
248        match definition.fs_type.as_str() {
249            "local" => {
250                if is_remote_uri(raw_path) {
251                    return Err(Box::new(ConfigError(format!(
252                        "entity.name={} {field} must be a local path (got {})",
253                        entity_name, raw_path
254                    ))));
255                }
256                if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
257                    return Err(Box::new(ConfigError(format!(
258                        "entity.name={} {field} must be absolute when config is remote",
259                        entity_name
260                    ))));
261                }
262                let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
263                Ok(ResolvedPath {
264                    storage: name.to_string(),
265                    uri: local_uri(&resolved),
266                    local_path: Some(resolved),
267                })
268            }
269            "s3" => {
270                let uri = resolve_s3_uri(&definition, raw_path)?;
271                Ok(ResolvedPath {
272                    storage: name.to_string(),
273                    uri,
274                    local_path: None,
275                })
276            }
277            "adls" => {
278                let uri = resolve_adls_uri(&definition, raw_path)?;
279                Ok(ResolvedPath {
280                    storage: name.to_string(),
281                    uri,
282                    local_path: None,
283                })
284            }
285            "gcs" => {
286                let uri = resolve_gcs_uri(&definition, raw_path)?;
287                Ok(ResolvedPath {
288                    storage: name.to_string(),
289                    uri,
290                    local_path: None,
291                })
292            }
293            _ => Err(Box::new(ConfigError(format!(
294                "storage type {} is unsupported",
295                definition.fs_type
296            )))),
297        }
298    }
299
300    pub fn resolve_report_path(
301        &self,
302        storage_name: Option<&str>,
303        raw_path: &str,
304    ) -> FloeResult<ResolvedPath> {
305        let name = storage_name.unwrap_or(self.default_name.as_str());
306        if !self.has_config && name != "local" {
307            return Err(Box::new(ConfigError(format!(
308                "report.storage references unknown storage {} (no storages block)",
309                name
310            ))));
311        }
312
313        let definition = if self.has_config {
314            self.definitions.get(name).cloned().ok_or_else(|| {
315                Box::new(ConfigError(format!(
316                    "report.storage references unknown storage {}",
317                    name
318                )))
319            })?
320        } else {
321            StorageDefinition {
322                name: "local".to_string(),
323                fs_type: "local".to_string(),
324                bucket: None,
325                region: None,
326                account: None,
327                container: None,
328                prefix: None,
329            }
330        };
331
332        let resolved_remote = self
333            .resolve_remote_relative(&definition, raw_path)
334            .unwrap_or_else(|| raw_path.to_string());
335        let raw_path = resolved_remote.as_str();
336
337        match definition.fs_type.as_str() {
338            "local" => {
339                if is_remote_uri(raw_path) {
340                    return Err(Box::new(ConfigError(format!(
341                        "report.path must be a local path (got {})",
342                        raw_path
343                    ))));
344                }
345                if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
346                    return Err(Box::new(ConfigError(
347                        "report.path must be absolute when config is remote".to_string(),
348                    )));
349                }
350                let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
351                Ok(ResolvedPath {
352                    storage: name.to_string(),
353                    uri: local_uri(&resolved),
354                    local_path: Some(resolved),
355                })
356            }
357            "s3" => {
358                let uri = resolve_s3_uri(&definition, raw_path)?;
359                Ok(ResolvedPath {
360                    storage: name.to_string(),
361                    uri,
362                    local_path: None,
363                })
364            }
365            "adls" => {
366                let uri = resolve_adls_uri(&definition, raw_path)?;
367                Ok(ResolvedPath {
368                    storage: name.to_string(),
369                    uri,
370                    local_path: None,
371                })
372            }
373            "gcs" => {
374                let uri = resolve_gcs_uri(&definition, raw_path)?;
375                Ok(ResolvedPath {
376                    storage: name.to_string(),
377                    uri,
378                    local_path: None,
379                })
380            }
381            _ => Err(Box::new(ConfigError(format!(
382                "storage type {} is unsupported",
383                definition.fs_type
384            )))),
385        }
386    }
387
388    pub fn definition(&self, name: &str) -> Option<StorageDefinition> {
389        if self.has_config {
390            self.definitions.get(name).cloned()
391        } else if name == "local" {
392            Some(StorageDefinition {
393                name: "local".to_string(),
394                fs_type: "local".to_string(),
395                bucket: None,
396                region: None,
397                account: None,
398                container: None,
399                prefix: None,
400            })
401        } else {
402            None
403        }
404    }
405
406    pub fn default_storage_name(&self) -> &str {
407        self.default_name.as_str()
408    }
409
410    pub fn config_dir(&self) -> &Path {
411        self.config_base.local_dir()
412    }
413
414    fn resolve_remote_relative(
415        &self,
416        definition: &StorageDefinition,
417        raw_path: &str,
418    ) -> Option<String> {
419        if !is_relative_path(raw_path) {
420            return None;
421        }
422        if definition.prefix.is_some() {
423            return None;
424        }
425        let remote = self.config_base.remote_base()?;
426        if !remote.matches_storage(definition.fs_type.as_str()) {
427            return None;
428        }
429        Some(remote.join(raw_path))
430    }
431}
432
433pub fn resolve_local_path(config_dir: &Path, raw_path: &str) -> PathBuf {
434    let path = Path::new(raw_path);
435    let resolved = if path.is_absolute() {
436        path.to_path_buf()
437    } else {
438        config_dir.join(path)
439    };
440    crate::io::storage::paths::normalize_local_path(&resolved)
441}
442
443fn local_uri(path: &Path) -> String {
444    let normalized = crate::io::storage::paths::normalize_local_path(path);
445    format!("local://{}", normalized.display())
446}
447
448fn resolve_s3_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
449    let bucket = definition.bucket.as_ref().ok_or_else(|| {
450        Box::new(ConfigError(format!(
451            "storage {} requires bucket for type s3",
452            definition.name
453        )))
454    })?;
455    if let Some((bucket_in_path, key)) = parse_s3_uri(raw_path) {
456        if bucket_in_path != *bucket {
457            return Err(Box::new(ConfigError(format!(
458                "storage {} bucket mismatch: {}",
459                definition.name, bucket_in_path
460            ))));
461        }
462        return Ok(format_s3_uri(bucket, &key));
463    }
464
465    let key = join_s3_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
466    Ok(format_s3_uri(bucket, &key))
467}
468
469fn resolve_adls_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
470    let account = definition.account.as_ref().ok_or_else(|| {
471        Box::new(ConfigError(format!(
472            "storage {} requires account for type adls",
473            definition.name
474        )))
475    })?;
476    let container = definition.container.as_ref().ok_or_else(|| {
477        Box::new(ConfigError(format!(
478            "storage {} requires container for type adls",
479            definition.name
480        )))
481    })?;
482    if let Some((container_in_path, account_in_path, path)) = parse_adls_uri(raw_path) {
483        if container_in_path != *container || account_in_path != *account {
484            return Err(Box::new(ConfigError(format!(
485                "storage {} adls account/container mismatch",
486                definition.name
487            ))));
488        }
489        return Ok(format_adls_uri(container, account, &path));
490    }
491    let prefix = definition.prefix.as_deref().unwrap_or("");
492    let combined = join_adls_path(prefix, raw_path);
493    Ok(format_adls_uri(container, account, &combined))
494}
495
496fn join_adls_path(prefix: &str, raw_path: &str) -> String {
497    let prefix = prefix.trim_matches('/');
498    let trimmed = raw_path.trim_start_matches('/');
499    match (prefix.is_empty(), trimmed.is_empty()) {
500        (true, true) => String::new(),
501        (true, false) => trimmed.to_string(),
502        (false, true) => prefix.to_string(),
503        (false, false) => format!("{}/{}", prefix, trimmed),
504    }
505}
506
507fn format_adls_uri(container: &str, account: &str, path: &str) -> String {
508    if path.is_empty() {
509        format!("abfs://{}@{}.dfs.core.windows.net", container, account)
510    } else {
511        format!(
512            "abfs://{}@{}.dfs.core.windows.net/{}",
513            container, account, path
514        )
515    }
516}
517
518fn parse_s3_uri(value: &str) -> Option<(String, String)> {
519    let stripped = value.strip_prefix("s3://")?;
520    let mut parts = stripped.splitn(2, '/');
521    let bucket = parts.next()?.to_string();
522    if bucket.is_empty() {
523        return None;
524    }
525    let key = parts.next().unwrap_or("").to_string();
526    Some((bucket, key))
527}
528
529fn join_s3_key(prefix: &str, raw_path: &str) -> String {
530    let prefix = prefix.trim_matches('/');
531    let trimmed = raw_path.trim_start_matches('/');
532    match (prefix.is_empty(), trimmed.is_empty()) {
533        (true, true) => String::new(),
534        (true, false) => trimmed.to_string(),
535        (false, true) => prefix.to_string(),
536        (false, false) => format!("{}/{}", prefix, trimmed),
537    }
538}
539
540fn format_s3_uri(bucket: &str, key: &str) -> String {
541    if key.is_empty() {
542        format!("s3://{}", bucket)
543    } else {
544        format!("s3://{}/{}", bucket, key)
545    }
546}
547
548fn resolve_gcs_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
549    let bucket = definition.bucket.as_ref().ok_or_else(|| {
550        Box::new(ConfigError(format!(
551            "storage {} requires bucket for type gcs",
552            definition.name
553        )))
554    })?;
555    if let Some((bucket_in_path, key)) = parse_gcs_uri(raw_path) {
556        if bucket_in_path != *bucket {
557            return Err(Box::new(ConfigError(format!(
558                "storage {} bucket mismatch: {}",
559                definition.name, bucket_in_path
560            ))));
561        }
562        return Ok(format_gcs_uri(bucket, &key));
563    }
564
565    let key = join_gcs_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
566    Ok(format_gcs_uri(bucket, &key))
567}
568
569fn parse_gcs_uri(value: &str) -> Option<(String, String)> {
570    let stripped = value.strip_prefix("gs://")?;
571    let mut parts = stripped.splitn(2, '/');
572    let bucket = parts.next()?.to_string();
573    if bucket.is_empty() {
574        return None;
575    }
576    let key = parts.next().unwrap_or("").to_string();
577    Some((bucket, key))
578}
579
580fn join_gcs_key(prefix: &str, raw_path: &str) -> String {
581    let prefix = prefix.trim_matches('/');
582    let trimmed = raw_path.trim_start_matches('/');
583    match (prefix.is_empty(), trimmed.is_empty()) {
584        (true, true) => String::new(),
585        (true, false) => trimmed.to_string(),
586        (false, true) => prefix.to_string(),
587        (false, false) => format!("{}/{}", prefix, trimmed),
588    }
589}
590
591fn format_gcs_uri(bucket: &str, key: &str) -> String {
592    if key.is_empty() {
593        format!("gs://{}", bucket)
594    } else {
595        format!("gs://{}/{}", bucket, key)
596    }
597}
598
599fn parse_adls_uri(value: &str) -> Option<(String, String, String)> {
600    let stripped = value.strip_prefix("abfs://")?;
601    let (container, rest) = stripped.split_once('@')?;
602    let (account, path) = rest.split_once(".dfs.core.windows.net")?;
603    let path = path.trim_start_matches('/').to_string();
604    if container.is_empty() || account.is_empty() {
605        return None;
606    }
607    Some((container.to_string(), account.to_string(), path))
608}
609
610fn parent_prefix(key: &str) -> String {
611    match key.rsplit_once('/') {
612        Some((parent, _)) => parent.to_string(),
613        None => String::new(),
614    }
615}
616
617fn is_remote_uri(value: &str) -> bool {
618    value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
619}
620
621fn is_relative_path(value: &str) -> bool {
622    !is_remote_uri(value) && Path::new(value).is_relative()
623}