Skip to main content

floe_core/config/
storage.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::config::{RootConfig, StorageDefinition};
5use crate::{ConfigError, FloeResult};
6
7#[derive(Debug, Clone)]
8pub struct ResolvedPath {
9    pub storage: String,
10    pub uri: String,
11    pub local_path: Option<PathBuf>,
12}
13
14pub struct StorageResolver {
15    config_dir: PathBuf,
16    default_name: String,
17    definitions: HashMap<String, StorageDefinition>,
18    has_config: bool,
19}
20
21impl StorageResolver {
22    pub fn new(config: &RootConfig, config_path: &Path) -> FloeResult<Self> {
23        let config_dir = config_path
24            .parent()
25            .unwrap_or_else(|| Path::new("."))
26            .to_path_buf();
27        if let Some(storages) = &config.storages {
28            let mut definitions = HashMap::new();
29            for definition in &storages.definitions {
30                if definitions
31                    .insert(definition.name.clone(), definition.clone())
32                    .is_some()
33                {
34                    return Err(Box::new(ConfigError(format!(
35                        "storages.definitions name={} is duplicated",
36                        definition.name
37                    ))));
38                }
39            }
40            let default_name = storages
41                .default
42                .clone()
43                .ok_or_else(|| Box::new(ConfigError("storages.default is required".to_string())))?;
44            if !definitions.contains_key(&default_name) {
45                return Err(Box::new(ConfigError(format!(
46                    "storages.default={} does not match any definition",
47                    default_name
48                ))));
49            }
50            Ok(Self {
51                config_dir,
52                default_name,
53                definitions,
54                has_config: true,
55            })
56        } else {
57            Ok(Self {
58                config_dir,
59                default_name: "local".to_string(),
60                definitions: HashMap::new(),
61                has_config: false,
62            })
63        }
64    }
65
66    pub fn resolve_path(
67        &self,
68        entity_name: &str,
69        field: &str,
70        storage_name: Option<&str>,
71        raw_path: &str,
72    ) -> FloeResult<ResolvedPath> {
73        let name = storage_name.unwrap_or(self.default_name.as_str());
74        if !self.has_config && name != "local" {
75            return Err(Box::new(ConfigError(format!(
76                "entity.name={} {field} references unknown storage {} (no storages block)",
77                entity_name, name
78            ))));
79        }
80
81        let definition = if self.has_config {
82            self.definitions.get(name).cloned().ok_or_else(|| {
83                Box::new(ConfigError(format!(
84                    "entity.name={} {field} references unknown storage {}",
85                    entity_name, name
86                )))
87            })?
88        } else {
89            StorageDefinition {
90                name: "local".to_string(),
91                fs_type: "local".to_string(),
92                bucket: None,
93                region: None,
94                account: None,
95                container: None,
96                prefix: None,
97            }
98        };
99
100        match definition.fs_type.as_str() {
101            "local" => {
102                let resolved = resolve_local_path(&self.config_dir, raw_path);
103                Ok(ResolvedPath {
104                    storage: name.to_string(),
105                    uri: local_uri(&resolved),
106                    local_path: Some(resolved),
107                })
108            }
109            "s3" => {
110                let uri = resolve_s3_uri(&definition, raw_path)?;
111                Ok(ResolvedPath {
112                    storage: name.to_string(),
113                    uri,
114                    local_path: None,
115                })
116            }
117            "adls" => {
118                let uri = resolve_adls_uri(&definition, raw_path)?;
119                Ok(ResolvedPath {
120                    storage: name.to_string(),
121                    uri,
122                    local_path: None,
123                })
124            }
125            "gcs" => {
126                let uri = resolve_gcs_uri(&definition, raw_path)?;
127                Ok(ResolvedPath {
128                    storage: name.to_string(),
129                    uri,
130                    local_path: None,
131                })
132            }
133            _ => Err(Box::new(ConfigError(format!(
134                "storage type {} is unsupported",
135                definition.fs_type
136            )))),
137        }
138    }
139
140    pub fn definition(&self, name: &str) -> Option<StorageDefinition> {
141        if self.has_config {
142            self.definitions.get(name).cloned()
143        } else if name == "local" {
144            Some(StorageDefinition {
145                name: "local".to_string(),
146                fs_type: "local".to_string(),
147                bucket: None,
148                region: None,
149                account: None,
150                container: None,
151                prefix: None,
152            })
153        } else {
154            None
155        }
156    }
157}
158
159pub fn resolve_local_path(config_dir: &Path, raw_path: &str) -> PathBuf {
160    let path = Path::new(raw_path);
161    if path.is_absolute() {
162        path.to_path_buf()
163    } else {
164        config_dir.join(path)
165    }
166}
167
168fn local_uri(path: &Path) -> String {
169    format!("local://{}", path.display())
170}
171
172fn resolve_s3_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
173    let bucket = definition.bucket.as_ref().ok_or_else(|| {
174        Box::new(ConfigError(format!(
175            "storage {} requires bucket for type s3",
176            definition.name
177        )))
178    })?;
179    if let Some((bucket_in_path, key)) = parse_s3_uri(raw_path) {
180        if bucket_in_path != *bucket {
181            return Err(Box::new(ConfigError(format!(
182                "storage {} bucket mismatch: {}",
183                definition.name, bucket_in_path
184            ))));
185        }
186        return Ok(format_s3_uri(bucket, &key));
187    }
188
189    let key = join_s3_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
190    Ok(format_s3_uri(bucket, &key))
191}
192
193fn resolve_adls_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
194    let account = definition.account.as_ref().ok_or_else(|| {
195        Box::new(ConfigError(format!(
196            "storage {} requires account for type adls",
197            definition.name
198        )))
199    })?;
200    let container = definition.container.as_ref().ok_or_else(|| {
201        Box::new(ConfigError(format!(
202            "storage {} requires container for type adls",
203            definition.name
204        )))
205    })?;
206    let prefix = definition.prefix.as_deref().unwrap_or("");
207    let combined = join_adls_path(prefix, raw_path);
208    Ok(format_adls_uri(container, account, &combined))
209}
210
211fn join_adls_path(prefix: &str, raw_path: &str) -> String {
212    let prefix = prefix.trim_matches('/');
213    let trimmed = raw_path.trim_start_matches('/');
214    match (prefix.is_empty(), trimmed.is_empty()) {
215        (true, true) => String::new(),
216        (true, false) => trimmed.to_string(),
217        (false, true) => prefix.to_string(),
218        (false, false) => format!("{}/{}", prefix, trimmed),
219    }
220}
221
222fn format_adls_uri(container: &str, account: &str, path: &str) -> String {
223    if path.is_empty() {
224        format!("abfs://{}@{}.dfs.core.windows.net", container, account)
225    } else {
226        format!(
227            "abfs://{}@{}.dfs.core.windows.net/{}",
228            container, account, path
229        )
230    }
231}
232
233fn parse_s3_uri(value: &str) -> Option<(String, String)> {
234    let stripped = value.strip_prefix("s3://")?;
235    let mut parts = stripped.splitn(2, '/');
236    let bucket = parts.next()?.to_string();
237    if bucket.is_empty() {
238        return None;
239    }
240    let key = parts.next().unwrap_or("").to_string();
241    Some((bucket, key))
242}
243
244fn join_s3_key(prefix: &str, raw_path: &str) -> String {
245    let prefix = prefix.trim_matches('/');
246    let trimmed = raw_path.trim_start_matches('/');
247    match (prefix.is_empty(), trimmed.is_empty()) {
248        (true, true) => String::new(),
249        (true, false) => trimmed.to_string(),
250        (false, true) => prefix.to_string(),
251        (false, false) => format!("{}/{}", prefix, trimmed),
252    }
253}
254
255fn format_s3_uri(bucket: &str, key: &str) -> String {
256    if key.is_empty() {
257        format!("s3://{}", bucket)
258    } else {
259        format!("s3://{}/{}", bucket, key)
260    }
261}
262
263fn resolve_gcs_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
264    let bucket = definition.bucket.as_ref().ok_or_else(|| {
265        Box::new(ConfigError(format!(
266            "storage {} requires bucket for type gcs",
267            definition.name
268        )))
269    })?;
270    if let Some((bucket_in_path, key)) = parse_gcs_uri(raw_path) {
271        if bucket_in_path != *bucket {
272            return Err(Box::new(ConfigError(format!(
273                "storage {} bucket mismatch: {}",
274                definition.name, bucket_in_path
275            ))));
276        }
277        return Ok(format_gcs_uri(bucket, &key));
278    }
279
280    let key = join_gcs_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
281    Ok(format_gcs_uri(bucket, &key))
282}
283
284fn parse_gcs_uri(value: &str) -> Option<(String, String)> {
285    let stripped = value.strip_prefix("gs://")?;
286    let mut parts = stripped.splitn(2, '/');
287    let bucket = parts.next()?.to_string();
288    if bucket.is_empty() {
289        return None;
290    }
291    let key = parts.next().unwrap_or("").to_string();
292    Some((bucket, key))
293}
294
295fn join_gcs_key(prefix: &str, raw_path: &str) -> String {
296    let prefix = prefix.trim_matches('/');
297    let trimmed = raw_path.trim_start_matches('/');
298    match (prefix.is_empty(), trimmed.is_empty()) {
299        (true, true) => String::new(),
300        (true, false) => trimmed.to_string(),
301        (false, true) => prefix.to_string(),
302        (false, false) => format!("{}/{}", prefix, trimmed),
303    }
304}
305
306fn format_gcs_uri(bucket: &str, key: &str) -> String {
307    if key.is_empty() {
308        format!("gs://{}", bucket)
309    } else {
310        format!("gs://{}/{}", bucket, key)
311    }
312}