Skip to main content

floe_core/config/
storage.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::config::{RootConfig, StorageDefinition};
5use crate::{ConfigError, FloeResult};
6
7#[derive(Debug, Clone)]
8pub struct ConfigBase {
9    local_dir: PathBuf,
10    remote_base: Option<RemoteConfigBase>,
11}
12
13impl ConfigBase {
14    pub fn local_from_path(path: &Path) -> Self {
15        let local_dir = path
16            .parent()
17            .unwrap_or_else(|| Path::new("."))
18            .to_path_buf();
19        Self {
20            local_dir,
21            remote_base: None,
22        }
23    }
24
25    pub fn remote_from_uri(local_dir: PathBuf, uri: &str) -> FloeResult<Self> {
26        Ok(Self {
27            local_dir,
28            remote_base: Some(RemoteConfigBase::from_uri(uri)?),
29        })
30    }
31
32    pub fn local_dir(&self) -> &Path {
33        &self.local_dir
34    }
35
36    pub fn remote_base(&self) -> Option<&RemoteConfigBase> {
37        self.remote_base.as_ref()
38    }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum RemoteScheme {
43    S3,
44    Gcs,
45    Adls,
46}
47
48#[derive(Debug, Clone)]
49pub struct RemoteConfigBase {
50    scheme: RemoteScheme,
51    bucket: String,
52    account: Option<String>,
53    container: Option<String>,
54    prefix: String,
55}
56
57impl RemoteConfigBase {
58    fn from_uri(uri: &str) -> FloeResult<Self> {
59        if let Some((bucket, key)) = parse_s3_uri(uri) {
60            let prefix = parent_prefix(&key);
61            return Ok(Self {
62                scheme: RemoteScheme::S3,
63                bucket,
64                account: None,
65                container: None,
66                prefix,
67            });
68        }
69        if let Some((bucket, key)) = parse_gcs_uri(uri) {
70            let prefix = parent_prefix(&key);
71            return Ok(Self {
72                scheme: RemoteScheme::Gcs,
73                bucket,
74                account: None,
75                container: None,
76                prefix,
77            });
78        }
79        if let Some((container, account, path)) = parse_adls_uri(uri) {
80            let prefix = parent_prefix(&path);
81            return Ok(Self {
82                scheme: RemoteScheme::Adls,
83                bucket: String::new(),
84                account: Some(account),
85                container: Some(container),
86                prefix,
87            });
88        }
89        Err(Box::new(ConfigError(format!(
90            "unsupported config uri: {}",
91            uri
92        ))))
93    }
94
95    fn matches_storage(&self, storage_type: &str) -> bool {
96        matches!(
97            (self.scheme, storage_type),
98            (RemoteScheme::S3, "s3") | (RemoteScheme::Gcs, "gcs") | (RemoteScheme::Adls, "adls")
99        )
100    }
101
102    fn join(&self, relative: &str) -> String {
103        match self.scheme {
104            RemoteScheme::S3 => {
105                let key = join_s3_key(&self.prefix, relative);
106                format_s3_uri(&self.bucket, &key)
107            }
108            RemoteScheme::Gcs => {
109                let key = join_s3_key(&self.prefix, relative);
110                format_gcs_uri(&self.bucket, &key)
111            }
112            RemoteScheme::Adls => {
113                let combined = join_adls_path(&self.prefix, relative);
114                let container = self.container.as_ref().expect("container");
115                let account = self.account.as_ref().expect("account");
116                format_adls_uri(container, account, &combined)
117            }
118        }
119    }
120}
121
122#[derive(Debug, Clone)]
123pub struct ResolvedPath {
124    pub storage: String,
125    pub uri: String,
126    pub local_path: Option<PathBuf>,
127}
128
129#[derive(Clone)]
130pub struct StorageResolver {
131    config_base: ConfigBase,
132    default_name: String,
133    definitions: HashMap<String, StorageDefinition>,
134    has_config: bool,
135}
136
137impl StorageResolver {
138    pub fn config_local_dir(&self) -> &Path {
139        self.config_base.local_dir()
140    }
141
142    pub fn config_is_remote(&self) -> bool {
143        self.config_base.remote_base().is_some()
144    }
145
146    pub fn resolve_local_path(&self, raw_path: &str) -> FloeResult<ResolvedPath> {
147        if is_remote_uri(raw_path) {
148            return Err(Box::new(ConfigError(format!(
149                "entity.state.path must be a local path (got {})",
150                raw_path
151            ))));
152        }
153        if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
154            return Err(Box::new(ConfigError(
155                "entity.state.path must be absolute when config is remote".to_string(),
156            )));
157        }
158        let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
159        Ok(ResolvedPath {
160            storage: "local".to_string(),
161            uri: local_uri(&resolved),
162            local_path: Some(resolved),
163        })
164    }
165
166    pub fn new(config: &RootConfig, config_base: ConfigBase) -> FloeResult<Self> {
167        if let Some(storages) = &config.storages {
168            let mut definitions = HashMap::new();
169            for definition in &storages.definitions {
170                if definitions
171                    .insert(definition.name.clone(), definition.clone())
172                    .is_some()
173                {
174                    return Err(Box::new(ConfigError(format!(
175                        "storages.definitions name={} is duplicated",
176                        definition.name
177                    ))));
178                }
179            }
180            let default_name = storages
181                .default
182                .clone()
183                .ok_or_else(|| Box::new(ConfigError("storages.default is required".to_string())))?;
184            if !definitions.contains_key(&default_name) {
185                return Err(Box::new(ConfigError(format!(
186                    "storages.default={} does not match any definition",
187                    default_name
188                ))));
189            }
190            Ok(Self {
191                config_base,
192                default_name,
193                definitions,
194                has_config: true,
195            })
196        } else {
197            Ok(Self {
198                config_base,
199                default_name: "local".to_string(),
200                definitions: HashMap::new(),
201                has_config: false,
202            })
203        }
204    }
205
206    pub fn from_path(config: &RootConfig, config_path: &Path) -> FloeResult<Self> {
207        Self::new(config, ConfigBase::local_from_path(config_path))
208    }
209
210    pub fn resolve_path(
211        &self,
212        entity_name: &str,
213        field: &str,
214        storage_name: Option<&str>,
215        raw_path: &str,
216    ) -> FloeResult<ResolvedPath> {
217        let name = storage_name.unwrap_or(self.default_name.as_str());
218        if !self.has_config && name != "local" {
219            return Err(Box::new(ConfigError(format!(
220                "entity.name={} {field} references unknown storage {} (no storages block)",
221                entity_name, name
222            ))));
223        }
224
225        let definition = if self.has_config {
226            self.definitions.get(name).cloned().ok_or_else(|| {
227                Box::new(ConfigError(format!(
228                    "entity.name={} {field} references unknown storage {}",
229                    entity_name, name
230                )))
231            })?
232        } else {
233            StorageDefinition {
234                name: "local".to_string(),
235                fs_type: "local".to_string(),
236                bucket: None,
237                region: None,
238                account: None,
239                container: None,
240                prefix: None,
241            }
242        };
243
244        let resolved_remote = self
245            .resolve_remote_relative(&definition, raw_path)
246            .unwrap_or_else(|| raw_path.to_string());
247        let raw_path = resolved_remote.as_str();
248
249        match definition.fs_type.as_str() {
250            "local" => {
251                if is_remote_uri(raw_path) {
252                    return Err(Box::new(ConfigError(format!(
253                        "entity.name={} {field} must be a local path (got {})",
254                        entity_name, raw_path
255                    ))));
256                }
257                if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
258                    return Err(Box::new(ConfigError(format!(
259                        "entity.name={} {field} must be absolute when config is remote",
260                        entity_name
261                    ))));
262                }
263                let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
264                Ok(ResolvedPath {
265                    storage: name.to_string(),
266                    uri: local_uri(&resolved),
267                    local_path: Some(resolved),
268                })
269            }
270            "s3" => {
271                let uri = resolve_s3_uri(&definition, raw_path)?;
272                Ok(ResolvedPath {
273                    storage: name.to_string(),
274                    uri,
275                    local_path: None,
276                })
277            }
278            "adls" => {
279                let uri = resolve_adls_uri(&definition, raw_path)?;
280                Ok(ResolvedPath {
281                    storage: name.to_string(),
282                    uri,
283                    local_path: None,
284                })
285            }
286            "gcs" => {
287                let uri = resolve_gcs_uri(&definition, raw_path)?;
288                Ok(ResolvedPath {
289                    storage: name.to_string(),
290                    uri,
291                    local_path: None,
292                })
293            }
294            _ => Err(Box::new(ConfigError(format!(
295                "storage type {} is unsupported",
296                definition.fs_type
297            )))),
298        }
299    }
300
301    pub fn resolve_report_path(
302        &self,
303        storage_name: Option<&str>,
304        raw_path: &str,
305    ) -> FloeResult<ResolvedPath> {
306        let name = storage_name.unwrap_or(self.default_name.as_str());
307        if !self.has_config && name != "local" {
308            return Err(Box::new(ConfigError(format!(
309                "report.storage references unknown storage {} (no storages block)",
310                name
311            ))));
312        }
313
314        let definition = if self.has_config {
315            self.definitions.get(name).cloned().ok_or_else(|| {
316                Box::new(ConfigError(format!(
317                    "report.storage references unknown storage {}",
318                    name
319                )))
320            })?
321        } else {
322            StorageDefinition {
323                name: "local".to_string(),
324                fs_type: "local".to_string(),
325                bucket: None,
326                region: None,
327                account: None,
328                container: None,
329                prefix: None,
330            }
331        };
332
333        let resolved_remote = self
334            .resolve_remote_relative(&definition, raw_path)
335            .unwrap_or_else(|| raw_path.to_string());
336        let raw_path = resolved_remote.as_str();
337
338        match definition.fs_type.as_str() {
339            "local" => {
340                if is_remote_uri(raw_path) {
341                    return Err(Box::new(ConfigError(format!(
342                        "report.path must be a local path (got {})",
343                        raw_path
344                    ))));
345                }
346                if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
347                    return Err(Box::new(ConfigError(
348                        "report.path must be absolute when config is remote".to_string(),
349                    )));
350                }
351                let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
352                Ok(ResolvedPath {
353                    storage: name.to_string(),
354                    uri: local_uri(&resolved),
355                    local_path: Some(resolved),
356                })
357            }
358            "s3" => {
359                let uri = resolve_s3_uri(&definition, raw_path)?;
360                Ok(ResolvedPath {
361                    storage: name.to_string(),
362                    uri,
363                    local_path: None,
364                })
365            }
366            "adls" => {
367                let uri = resolve_adls_uri(&definition, raw_path)?;
368                Ok(ResolvedPath {
369                    storage: name.to_string(),
370                    uri,
371                    local_path: None,
372                })
373            }
374            "gcs" => {
375                let uri = resolve_gcs_uri(&definition, raw_path)?;
376                Ok(ResolvedPath {
377                    storage: name.to_string(),
378                    uri,
379                    local_path: None,
380                })
381            }
382            _ => Err(Box::new(ConfigError(format!(
383                "storage type {} is unsupported",
384                definition.fs_type
385            )))),
386        }
387    }
388
389    pub fn definition(&self, name: &str) -> Option<StorageDefinition> {
390        if self.has_config {
391            self.definitions.get(name).cloned()
392        } else if name == "local" {
393            Some(StorageDefinition {
394                name: "local".to_string(),
395                fs_type: "local".to_string(),
396                bucket: None,
397                region: None,
398                account: None,
399                container: None,
400                prefix: None,
401            })
402        } else {
403            None
404        }
405    }
406
407    pub fn default_storage_name(&self) -> &str {
408        self.default_name.as_str()
409    }
410
411    pub fn config_dir(&self) -> &Path {
412        self.config_base.local_dir()
413    }
414
415    fn resolve_remote_relative(
416        &self,
417        definition: &StorageDefinition,
418        raw_path: &str,
419    ) -> Option<String> {
420        if !is_relative_path(raw_path) {
421            return None;
422        }
423        if definition.prefix.is_some() {
424            return None;
425        }
426        let remote = self.config_base.remote_base()?;
427        if !remote.matches_storage(definition.fs_type.as_str()) {
428            return None;
429        }
430        Some(remote.join(raw_path))
431    }
432}
433
434pub fn resolve_local_path(config_dir: &Path, raw_path: &str) -> PathBuf {
435    let path = Path::new(raw_path);
436    let resolved = if path.is_absolute() {
437        path.to_path_buf()
438    } else {
439        config_dir.join(path)
440    };
441    crate::io::storage::paths::normalize_local_path(&resolved)
442}
443
444fn local_uri(path: &Path) -> String {
445    let normalized = crate::io::storage::paths::normalize_local_path(path);
446    format!("local://{}", normalized.display())
447}
448
449fn resolve_s3_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
450    let bucket = definition.bucket.as_ref().ok_or_else(|| {
451        Box::new(ConfigError(format!(
452            "storage {} requires bucket for type s3",
453            definition.name
454        )))
455    })?;
456    if let Some((bucket_in_path, key)) = parse_s3_uri(raw_path) {
457        if bucket_in_path != *bucket {
458            return Err(Box::new(ConfigError(format!(
459                "storage {} bucket mismatch: {}",
460                definition.name, bucket_in_path
461            ))));
462        }
463        return Ok(format_s3_uri(bucket, &key));
464    }
465
466    let key = join_s3_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
467    Ok(format_s3_uri(bucket, &key))
468}
469
470fn resolve_adls_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
471    let account = definition.account.as_ref().ok_or_else(|| {
472        Box::new(ConfigError(format!(
473            "storage {} requires account for type adls",
474            definition.name
475        )))
476    })?;
477    let container = definition.container.as_ref().ok_or_else(|| {
478        Box::new(ConfigError(format!(
479            "storage {} requires container for type adls",
480            definition.name
481        )))
482    })?;
483    if let Some((container_in_path, account_in_path, path)) = parse_adls_uri(raw_path) {
484        if container_in_path != *container || account_in_path != *account {
485            return Err(Box::new(ConfigError(format!(
486                "storage {} adls account/container mismatch",
487                definition.name
488            ))));
489        }
490        return Ok(format_adls_uri(container, account, &path));
491    }
492    let prefix = definition.prefix.as_deref().unwrap_or("");
493    let combined = join_adls_path(prefix, raw_path);
494    Ok(format_adls_uri(container, account, &combined))
495}
496
497fn join_adls_path(prefix: &str, raw_path: &str) -> String {
498    let prefix = prefix.trim_matches('/');
499    let trimmed = raw_path.trim_start_matches('/');
500    match (prefix.is_empty(), trimmed.is_empty()) {
501        (true, true) => String::new(),
502        (true, false) => trimmed.to_string(),
503        (false, true) => prefix.to_string(),
504        (false, false) => format!("{}/{}", prefix, trimmed),
505    }
506}
507
508fn format_adls_uri(container: &str, account: &str, path: &str) -> String {
509    if path.is_empty() {
510        format!("abfs://{}@{}.dfs.core.windows.net", container, account)
511    } else {
512        format!(
513            "abfs://{}@{}.dfs.core.windows.net/{}",
514            container, account, path
515        )
516    }
517}
518
519fn parse_s3_uri(value: &str) -> Option<(String, String)> {
520    let stripped = value.strip_prefix("s3://")?;
521    let mut parts = stripped.splitn(2, '/');
522    let bucket = parts.next()?.to_string();
523    if bucket.is_empty() {
524        return None;
525    }
526    let key = parts.next().unwrap_or("").to_string();
527    Some((bucket, key))
528}
529
530fn join_s3_key(prefix: &str, raw_path: &str) -> String {
531    let prefix = prefix.trim_matches('/');
532    let trimmed = raw_path.trim_start_matches('/');
533    match (prefix.is_empty(), trimmed.is_empty()) {
534        (true, true) => String::new(),
535        (true, false) => trimmed.to_string(),
536        (false, true) => prefix.to_string(),
537        (false, false) => format!("{}/{}", prefix, trimmed),
538    }
539}
540
541fn format_s3_uri(bucket: &str, key: &str) -> String {
542    if key.is_empty() {
543        format!("s3://{}", bucket)
544    } else {
545        format!("s3://{}/{}", bucket, key)
546    }
547}
548
549fn resolve_gcs_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
550    let bucket = definition.bucket.as_ref().ok_or_else(|| {
551        Box::new(ConfigError(format!(
552            "storage {} requires bucket for type gcs",
553            definition.name
554        )))
555    })?;
556    if let Some((bucket_in_path, key)) = parse_gcs_uri(raw_path) {
557        if bucket_in_path != *bucket {
558            return Err(Box::new(ConfigError(format!(
559                "storage {} bucket mismatch: {}",
560                definition.name, bucket_in_path
561            ))));
562        }
563        return Ok(format_gcs_uri(bucket, &key));
564    }
565
566    let key = join_gcs_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
567    Ok(format_gcs_uri(bucket, &key))
568}
569
570fn parse_gcs_uri(value: &str) -> Option<(String, String)> {
571    let stripped = value.strip_prefix("gs://")?;
572    let mut parts = stripped.splitn(2, '/');
573    let bucket = parts.next()?.to_string();
574    if bucket.is_empty() {
575        return None;
576    }
577    let key = parts.next().unwrap_or("").to_string();
578    Some((bucket, key))
579}
580
581fn join_gcs_key(prefix: &str, raw_path: &str) -> String {
582    let prefix = prefix.trim_matches('/');
583    let trimmed = raw_path.trim_start_matches('/');
584    match (prefix.is_empty(), trimmed.is_empty()) {
585        (true, true) => String::new(),
586        (true, false) => trimmed.to_string(),
587        (false, true) => prefix.to_string(),
588        (false, false) => format!("{}/{}", prefix, trimmed),
589    }
590}
591
592fn format_gcs_uri(bucket: &str, key: &str) -> String {
593    if key.is_empty() {
594        format!("gs://{}", bucket)
595    } else {
596        format!("gs://{}/{}", bucket, key)
597    }
598}
599
600fn parse_adls_uri(value: &str) -> Option<(String, String, String)> {
601    let stripped = value.strip_prefix("abfs://")?;
602    let (container, rest) = stripped.split_once('@')?;
603    let (account, path) = rest.split_once(".dfs.core.windows.net")?;
604    let path = path.trim_start_matches('/').to_string();
605    if container.is_empty() || account.is_empty() {
606        return None;
607    }
608    Some((container.to_string(), account.to_string(), path))
609}
610
611fn parent_prefix(key: &str) -> String {
612    match key.rsplit_once('/') {
613        Some((parent, _)) => parent.to_string(),
614        None => String::new(),
615    }
616}
617
618fn is_remote_uri(value: &str) -> bool {
619    value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
620}
621
622fn is_relative_path(value: &str) -> bool {
623    !is_remote_uri(value) && Path::new(value).is_relative()
624}