Skip to main content

floe_core/config/
storage.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::config::{RootConfig, StorageDefinition};
5use crate::{ConfigError, FloeResult};
6
7#[derive(Debug, Clone)]
8pub struct ConfigBase {
9    local_dir: PathBuf,
10    remote_base: Option<RemoteConfigBase>,
11}
12
13impl ConfigBase {
14    pub fn local_from_path(path: &Path) -> Self {
15        let local_dir = path
16            .parent()
17            .unwrap_or_else(|| Path::new("."))
18            .to_path_buf();
19        Self {
20            local_dir,
21            remote_base: None,
22        }
23    }
24
25    pub fn remote_from_uri(local_dir: PathBuf, uri: &str) -> FloeResult<Self> {
26        Ok(Self {
27            local_dir,
28            remote_base: Some(RemoteConfigBase::from_uri(uri)?),
29        })
30    }
31
32    pub fn local_dir(&self) -> &Path {
33        &self.local_dir
34    }
35
36    pub fn remote_base(&self) -> Option<&RemoteConfigBase> {
37        self.remote_base.as_ref()
38    }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum RemoteScheme {
43    S3,
44    Gcs,
45    Adls,
46}
47
48#[derive(Debug, Clone)]
49pub struct RemoteConfigBase {
50    scheme: RemoteScheme,
51    bucket: String,
52    account: Option<String>,
53    container: Option<String>,
54    prefix: String,
55}
56
57impl RemoteConfigBase {
58    fn from_uri(uri: &str) -> FloeResult<Self> {
59        if let Some((bucket, key)) = parse_s3_uri(uri) {
60            let prefix = parent_prefix(&key);
61            return Ok(Self {
62                scheme: RemoteScheme::S3,
63                bucket,
64                account: None,
65                container: None,
66                prefix,
67            });
68        }
69        if let Some((bucket, key)) = parse_gcs_uri(uri) {
70            let prefix = parent_prefix(&key);
71            return Ok(Self {
72                scheme: RemoteScheme::Gcs,
73                bucket,
74                account: None,
75                container: None,
76                prefix,
77            });
78        }
79        if let Some((container, account, path)) = parse_adls_uri(uri) {
80            let prefix = parent_prefix(&path);
81            return Ok(Self {
82                scheme: RemoteScheme::Adls,
83                bucket: String::new(),
84                account: Some(account),
85                container: Some(container),
86                prefix,
87            });
88        }
89        Err(Box::new(ConfigError(format!(
90            "unsupported config uri: {}",
91            uri
92        ))))
93    }
94
95    fn matches_storage(&self, storage_type: &str) -> bool {
96        matches!(
97            (self.scheme, storage_type),
98            (RemoteScheme::S3, "s3") | (RemoteScheme::Gcs, "gcs") | (RemoteScheme::Adls, "adls")
99        )
100    }
101
102    fn join(&self, relative: &str) -> String {
103        match self.scheme {
104            RemoteScheme::S3 => {
105                let key = join_s3_key(&self.prefix, relative);
106                format_s3_uri(&self.bucket, &key)
107            }
108            RemoteScheme::Gcs => {
109                let key = join_s3_key(&self.prefix, relative);
110                format_gcs_uri(&self.bucket, &key)
111            }
112            RemoteScheme::Adls => {
113                let combined = join_adls_path(&self.prefix, relative);
114                let container = self.container.as_ref().expect("container");
115                let account = self.account.as_ref().expect("account");
116                format_adls_uri(container, account, &combined)
117            }
118        }
119    }
120}
121
122#[derive(Debug, Clone)]
123pub struct ResolvedPath {
124    pub storage: String,
125    pub uri: String,
126    pub local_path: Option<PathBuf>,
127}
128
129pub struct StorageResolver {
130    config_base: ConfigBase,
131    default_name: String,
132    definitions: HashMap<String, StorageDefinition>,
133    has_config: bool,
134}
135
136impl StorageResolver {
137    pub fn new(config: &RootConfig, config_base: ConfigBase) -> FloeResult<Self> {
138        if let Some(storages) = &config.storages {
139            let mut definitions = HashMap::new();
140            for definition in &storages.definitions {
141                if definitions
142                    .insert(definition.name.clone(), definition.clone())
143                    .is_some()
144                {
145                    return Err(Box::new(ConfigError(format!(
146                        "storages.definitions name={} is duplicated",
147                        definition.name
148                    ))));
149                }
150            }
151            let default_name = storages
152                .default
153                .clone()
154                .ok_or_else(|| Box::new(ConfigError("storages.default is required".to_string())))?;
155            if !definitions.contains_key(&default_name) {
156                return Err(Box::new(ConfigError(format!(
157                    "storages.default={} does not match any definition",
158                    default_name
159                ))));
160            }
161            Ok(Self {
162                config_base,
163                default_name,
164                definitions,
165                has_config: true,
166            })
167        } else {
168            Ok(Self {
169                config_base,
170                default_name: "local".to_string(),
171                definitions: HashMap::new(),
172                has_config: false,
173            })
174        }
175    }
176
177    pub fn from_path(config: &RootConfig, config_path: &Path) -> FloeResult<Self> {
178        Self::new(config, ConfigBase::local_from_path(config_path))
179    }
180
181    pub fn resolve_path(
182        &self,
183        entity_name: &str,
184        field: &str,
185        storage_name: Option<&str>,
186        raw_path: &str,
187    ) -> FloeResult<ResolvedPath> {
188        let name = storage_name.unwrap_or(self.default_name.as_str());
189        if !self.has_config && name != "local" {
190            return Err(Box::new(ConfigError(format!(
191                "entity.name={} {field} references unknown storage {} (no storages block)",
192                entity_name, name
193            ))));
194        }
195
196        let definition = if self.has_config {
197            self.definitions.get(name).cloned().ok_or_else(|| {
198                Box::new(ConfigError(format!(
199                    "entity.name={} {field} references unknown storage {}",
200                    entity_name, name
201                )))
202            })?
203        } else {
204            StorageDefinition {
205                name: "local".to_string(),
206                fs_type: "local".to_string(),
207                bucket: None,
208                region: None,
209                account: None,
210                container: None,
211                prefix: None,
212            }
213        };
214
215        let resolved_remote = self
216            .resolve_remote_relative(&definition, raw_path)
217            .unwrap_or_else(|| raw_path.to_string());
218        let raw_path = resolved_remote.as_str();
219
220        match definition.fs_type.as_str() {
221            "local" => {
222                if is_remote_uri(raw_path) {
223                    return Err(Box::new(ConfigError(format!(
224                        "entity.name={} {field} must be a local path (got {})",
225                        entity_name, raw_path
226                    ))));
227                }
228                if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
229                    return Err(Box::new(ConfigError(format!(
230                        "entity.name={} {field} must be absolute when config is remote",
231                        entity_name
232                    ))));
233                }
234                let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
235                Ok(ResolvedPath {
236                    storage: name.to_string(),
237                    uri: local_uri(&resolved),
238                    local_path: Some(resolved),
239                })
240            }
241            "s3" => {
242                let uri = resolve_s3_uri(&definition, raw_path)?;
243                Ok(ResolvedPath {
244                    storage: name.to_string(),
245                    uri,
246                    local_path: None,
247                })
248            }
249            "adls" => {
250                let uri = resolve_adls_uri(&definition, raw_path)?;
251                Ok(ResolvedPath {
252                    storage: name.to_string(),
253                    uri,
254                    local_path: None,
255                })
256            }
257            "gcs" => {
258                let uri = resolve_gcs_uri(&definition, raw_path)?;
259                Ok(ResolvedPath {
260                    storage: name.to_string(),
261                    uri,
262                    local_path: None,
263                })
264            }
265            _ => Err(Box::new(ConfigError(format!(
266                "storage type {} is unsupported",
267                definition.fs_type
268            )))),
269        }
270    }
271
272    pub fn resolve_report_path(
273        &self,
274        storage_name: Option<&str>,
275        raw_path: &str,
276    ) -> FloeResult<ResolvedPath> {
277        let name = storage_name.unwrap_or(self.default_name.as_str());
278        if !self.has_config && name != "local" {
279            return Err(Box::new(ConfigError(format!(
280                "report.storage references unknown storage {} (no storages block)",
281                name
282            ))));
283        }
284
285        let definition = if self.has_config {
286            self.definitions.get(name).cloned().ok_or_else(|| {
287                Box::new(ConfigError(format!(
288                    "report.storage references unknown storage {}",
289                    name
290                )))
291            })?
292        } else {
293            StorageDefinition {
294                name: "local".to_string(),
295                fs_type: "local".to_string(),
296                bucket: None,
297                region: None,
298                account: None,
299                container: None,
300                prefix: None,
301            }
302        };
303
304        let resolved_remote = self
305            .resolve_remote_relative(&definition, raw_path)
306            .unwrap_or_else(|| raw_path.to_string());
307        let raw_path = resolved_remote.as_str();
308
309        match definition.fs_type.as_str() {
310            "local" => {
311                if is_remote_uri(raw_path) {
312                    return Err(Box::new(ConfigError(format!(
313                        "report.path must be a local path (got {})",
314                        raw_path
315                    ))));
316                }
317                if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
318                    return Err(Box::new(ConfigError(
319                        "report.path must be absolute when config is remote".to_string(),
320                    )));
321                }
322                let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
323                Ok(ResolvedPath {
324                    storage: name.to_string(),
325                    uri: local_uri(&resolved),
326                    local_path: Some(resolved),
327                })
328            }
329            "s3" => {
330                let uri = resolve_s3_uri(&definition, raw_path)?;
331                Ok(ResolvedPath {
332                    storage: name.to_string(),
333                    uri,
334                    local_path: None,
335                })
336            }
337            "adls" => {
338                let uri = resolve_adls_uri(&definition, raw_path)?;
339                Ok(ResolvedPath {
340                    storage: name.to_string(),
341                    uri,
342                    local_path: None,
343                })
344            }
345            "gcs" => {
346                let uri = resolve_gcs_uri(&definition, raw_path)?;
347                Ok(ResolvedPath {
348                    storage: name.to_string(),
349                    uri,
350                    local_path: None,
351                })
352            }
353            _ => Err(Box::new(ConfigError(format!(
354                "storage type {} is unsupported",
355                definition.fs_type
356            )))),
357        }
358    }
359
360    pub fn definition(&self, name: &str) -> Option<StorageDefinition> {
361        if self.has_config {
362            self.definitions.get(name).cloned()
363        } else if name == "local" {
364            Some(StorageDefinition {
365                name: "local".to_string(),
366                fs_type: "local".to_string(),
367                bucket: None,
368                region: None,
369                account: None,
370                container: None,
371                prefix: None,
372            })
373        } else {
374            None
375        }
376    }
377
378    pub fn default_storage_name(&self) -> &str {
379        self.default_name.as_str()
380    }
381
382    pub fn config_dir(&self) -> &Path {
383        self.config_base.local_dir()
384    }
385
386    fn resolve_remote_relative(
387        &self,
388        definition: &StorageDefinition,
389        raw_path: &str,
390    ) -> Option<String> {
391        if !is_relative_path(raw_path) {
392            return None;
393        }
394        if definition.prefix.is_some() {
395            return None;
396        }
397        let remote = self.config_base.remote_base()?;
398        if !remote.matches_storage(definition.fs_type.as_str()) {
399            return None;
400        }
401        Some(remote.join(raw_path))
402    }
403}
404
405pub fn resolve_local_path(config_dir: &Path, raw_path: &str) -> PathBuf {
406    let path = Path::new(raw_path);
407    if path.is_absolute() {
408        path.to_path_buf()
409    } else {
410        config_dir.join(path)
411    }
412}
413
414fn local_uri(path: &Path) -> String {
415    format!("local://{}", path.display())
416}
417
418fn resolve_s3_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
419    let bucket = definition.bucket.as_ref().ok_or_else(|| {
420        Box::new(ConfigError(format!(
421            "storage {} requires bucket for type s3",
422            definition.name
423        )))
424    })?;
425    if let Some((bucket_in_path, key)) = parse_s3_uri(raw_path) {
426        if bucket_in_path != *bucket {
427            return Err(Box::new(ConfigError(format!(
428                "storage {} bucket mismatch: {}",
429                definition.name, bucket_in_path
430            ))));
431        }
432        return Ok(format_s3_uri(bucket, &key));
433    }
434
435    let key = join_s3_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
436    Ok(format_s3_uri(bucket, &key))
437}
438
439fn resolve_adls_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
440    let account = definition.account.as_ref().ok_or_else(|| {
441        Box::new(ConfigError(format!(
442            "storage {} requires account for type adls",
443            definition.name
444        )))
445    })?;
446    let container = definition.container.as_ref().ok_or_else(|| {
447        Box::new(ConfigError(format!(
448            "storage {} requires container for type adls",
449            definition.name
450        )))
451    })?;
452    if let Some((container_in_path, account_in_path, path)) = parse_adls_uri(raw_path) {
453        if container_in_path != *container || account_in_path != *account {
454            return Err(Box::new(ConfigError(format!(
455                "storage {} adls account/container mismatch",
456                definition.name
457            ))));
458        }
459        return Ok(format_adls_uri(container, account, &path));
460    }
461    let prefix = definition.prefix.as_deref().unwrap_or("");
462    let combined = join_adls_path(prefix, raw_path);
463    Ok(format_adls_uri(container, account, &combined))
464}
465
466fn join_adls_path(prefix: &str, raw_path: &str) -> String {
467    let prefix = prefix.trim_matches('/');
468    let trimmed = raw_path.trim_start_matches('/');
469    match (prefix.is_empty(), trimmed.is_empty()) {
470        (true, true) => String::new(),
471        (true, false) => trimmed.to_string(),
472        (false, true) => prefix.to_string(),
473        (false, false) => format!("{}/{}", prefix, trimmed),
474    }
475}
476
477fn format_adls_uri(container: &str, account: &str, path: &str) -> String {
478    if path.is_empty() {
479        format!("abfs://{}@{}.dfs.core.windows.net", container, account)
480    } else {
481        format!(
482            "abfs://{}@{}.dfs.core.windows.net/{}",
483            container, account, path
484        )
485    }
486}
487
488fn parse_s3_uri(value: &str) -> Option<(String, String)> {
489    let stripped = value.strip_prefix("s3://")?;
490    let mut parts = stripped.splitn(2, '/');
491    let bucket = parts.next()?.to_string();
492    if bucket.is_empty() {
493        return None;
494    }
495    let key = parts.next().unwrap_or("").to_string();
496    Some((bucket, key))
497}
498
499fn join_s3_key(prefix: &str, raw_path: &str) -> String {
500    let prefix = prefix.trim_matches('/');
501    let trimmed = raw_path.trim_start_matches('/');
502    match (prefix.is_empty(), trimmed.is_empty()) {
503        (true, true) => String::new(),
504        (true, false) => trimmed.to_string(),
505        (false, true) => prefix.to_string(),
506        (false, false) => format!("{}/{}", prefix, trimmed),
507    }
508}
509
510fn format_s3_uri(bucket: &str, key: &str) -> String {
511    if key.is_empty() {
512        format!("s3://{}", bucket)
513    } else {
514        format!("s3://{}/{}", bucket, key)
515    }
516}
517
518fn resolve_gcs_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
519    let bucket = definition.bucket.as_ref().ok_or_else(|| {
520        Box::new(ConfigError(format!(
521            "storage {} requires bucket for type gcs",
522            definition.name
523        )))
524    })?;
525    if let Some((bucket_in_path, key)) = parse_gcs_uri(raw_path) {
526        if bucket_in_path != *bucket {
527            return Err(Box::new(ConfigError(format!(
528                "storage {} bucket mismatch: {}",
529                definition.name, bucket_in_path
530            ))));
531        }
532        return Ok(format_gcs_uri(bucket, &key));
533    }
534
535    let key = join_gcs_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
536    Ok(format_gcs_uri(bucket, &key))
537}
538
539fn parse_gcs_uri(value: &str) -> Option<(String, String)> {
540    let stripped = value.strip_prefix("gs://")?;
541    let mut parts = stripped.splitn(2, '/');
542    let bucket = parts.next()?.to_string();
543    if bucket.is_empty() {
544        return None;
545    }
546    let key = parts.next().unwrap_or("").to_string();
547    Some((bucket, key))
548}
549
550fn join_gcs_key(prefix: &str, raw_path: &str) -> String {
551    let prefix = prefix.trim_matches('/');
552    let trimmed = raw_path.trim_start_matches('/');
553    match (prefix.is_empty(), trimmed.is_empty()) {
554        (true, true) => String::new(),
555        (true, false) => trimmed.to_string(),
556        (false, true) => prefix.to_string(),
557        (false, false) => format!("{}/{}", prefix, trimmed),
558    }
559}
560
561fn format_gcs_uri(bucket: &str, key: &str) -> String {
562    if key.is_empty() {
563        format!("gs://{}", bucket)
564    } else {
565        format!("gs://{}/{}", bucket, key)
566    }
567}
568
569fn parse_adls_uri(value: &str) -> Option<(String, String, String)> {
570    let stripped = value.strip_prefix("abfs://")?;
571    let (container, rest) = stripped.split_once('@')?;
572    let (account, path) = rest.split_once(".dfs.core.windows.net")?;
573    let path = path.trim_start_matches('/').to_string();
574    if container.is_empty() || account.is_empty() {
575        return None;
576    }
577    Some((container.to_string(), account.to_string(), path))
578}
579
580fn parent_prefix(key: &str) -> String {
581    match key.rsplit_once('/') {
582        Some((parent, _)) => parent.to_string(),
583        None => String::new(),
584    }
585}
586
587fn is_remote_uri(value: &str) -> bool {
588    value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
589}
590
591fn is_relative_path(value: &str) -> bool {
592    !is_remote_uri(value) && Path::new(value).is_relative()
593}