1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::config::{RootConfig, StorageDefinition};
5use crate::{ConfigError, FloeResult};
6
7#[derive(Debug, Clone)]
8pub struct ResolvedPath {
9 pub storage: String,
10 pub uri: String,
11 pub local_path: Option<PathBuf>,
12}
13
14pub struct StorageResolver {
15 config_dir: PathBuf,
16 default_name: String,
17 definitions: HashMap<String, StorageDefinition>,
18 has_config: bool,
19}
20
21impl StorageResolver {
22 pub fn new(config: &RootConfig, config_path: &Path) -> FloeResult<Self> {
23 let config_dir = config_path
24 .parent()
25 .unwrap_or_else(|| Path::new("."))
26 .to_path_buf();
27 if let Some(storages) = &config.storages {
28 let mut definitions = HashMap::new();
29 for definition in &storages.definitions {
30 if definitions
31 .insert(definition.name.clone(), definition.clone())
32 .is_some()
33 {
34 return Err(Box::new(ConfigError(format!(
35 "storages.definitions name={} is duplicated",
36 definition.name
37 ))));
38 }
39 }
40 let default_name = storages
41 .default
42 .clone()
43 .ok_or_else(|| Box::new(ConfigError("storages.default is required".to_string())))?;
44 if !definitions.contains_key(&default_name) {
45 return Err(Box::new(ConfigError(format!(
46 "storages.default={} does not match any definition",
47 default_name
48 ))));
49 }
50 Ok(Self {
51 config_dir,
52 default_name,
53 definitions,
54 has_config: true,
55 })
56 } else {
57 Ok(Self {
58 config_dir,
59 default_name: "local".to_string(),
60 definitions: HashMap::new(),
61 has_config: false,
62 })
63 }
64 }
65
66 pub fn resolve_path(
67 &self,
68 entity_name: &str,
69 field: &str,
70 storage_name: Option<&str>,
71 raw_path: &str,
72 ) -> FloeResult<ResolvedPath> {
73 let name = storage_name.unwrap_or(self.default_name.as_str());
74 if !self.has_config && name != "local" {
75 return Err(Box::new(ConfigError(format!(
76 "entity.name={} {field} references unknown storage {} (no storages block)",
77 entity_name, name
78 ))));
79 }
80
81 let definition = if self.has_config {
82 self.definitions.get(name).cloned().ok_or_else(|| {
83 Box::new(ConfigError(format!(
84 "entity.name={} {field} references unknown storage {}",
85 entity_name, name
86 )))
87 })?
88 } else {
89 StorageDefinition {
90 name: "local".to_string(),
91 fs_type: "local".to_string(),
92 bucket: None,
93 region: None,
94 account: None,
95 container: None,
96 prefix: None,
97 }
98 };
99
100 match definition.fs_type.as_str() {
101 "local" => {
102 let resolved = resolve_local_path(&self.config_dir, raw_path);
103 Ok(ResolvedPath {
104 storage: name.to_string(),
105 uri: local_uri(&resolved),
106 local_path: Some(resolved),
107 })
108 }
109 "s3" => {
110 let uri = resolve_s3_uri(&definition, raw_path)?;
111 Ok(ResolvedPath {
112 storage: name.to_string(),
113 uri,
114 local_path: None,
115 })
116 }
117 "adls" => {
118 let uri = resolve_adls_uri(&definition, raw_path)?;
119 Ok(ResolvedPath {
120 storage: name.to_string(),
121 uri,
122 local_path: None,
123 })
124 }
125 "gcs" => {
126 let uri = resolve_gcs_uri(&definition, raw_path)?;
127 Ok(ResolvedPath {
128 storage: name.to_string(),
129 uri,
130 local_path: None,
131 })
132 }
133 _ => Err(Box::new(ConfigError(format!(
134 "storage type {} is unsupported",
135 definition.fs_type
136 )))),
137 }
138 }
139
140 pub fn definition(&self, name: &str) -> Option<StorageDefinition> {
141 if self.has_config {
142 self.definitions.get(name).cloned()
143 } else if name == "local" {
144 Some(StorageDefinition {
145 name: "local".to_string(),
146 fs_type: "local".to_string(),
147 bucket: None,
148 region: None,
149 account: None,
150 container: None,
151 prefix: None,
152 })
153 } else {
154 None
155 }
156 }
157}
158
159pub fn resolve_local_path(config_dir: &Path, raw_path: &str) -> PathBuf {
160 let path = Path::new(raw_path);
161 if path.is_absolute() {
162 path.to_path_buf()
163 } else {
164 config_dir.join(path)
165 }
166}
167
168fn local_uri(path: &Path) -> String {
169 format!("local://{}", path.display())
170}
171
172fn resolve_s3_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
173 let bucket = definition.bucket.as_ref().ok_or_else(|| {
174 Box::new(ConfigError(format!(
175 "storage {} requires bucket for type s3",
176 definition.name
177 )))
178 })?;
179 if let Some((bucket_in_path, key)) = parse_s3_uri(raw_path) {
180 if bucket_in_path != *bucket {
181 return Err(Box::new(ConfigError(format!(
182 "storage {} bucket mismatch: {}",
183 definition.name, bucket_in_path
184 ))));
185 }
186 return Ok(format_s3_uri(bucket, &key));
187 }
188
189 let key = join_s3_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
190 Ok(format_s3_uri(bucket, &key))
191}
192
193fn resolve_adls_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
194 let account = definition.account.as_ref().ok_or_else(|| {
195 Box::new(ConfigError(format!(
196 "storage {} requires account for type adls",
197 definition.name
198 )))
199 })?;
200 let container = definition.container.as_ref().ok_or_else(|| {
201 Box::new(ConfigError(format!(
202 "storage {} requires container for type adls",
203 definition.name
204 )))
205 })?;
206 let prefix = definition.prefix.as_deref().unwrap_or("");
207 let combined = join_adls_path(prefix, raw_path);
208 Ok(format_adls_uri(container, account, &combined))
209}
210
211fn join_adls_path(prefix: &str, raw_path: &str) -> String {
212 let prefix = prefix.trim_matches('/');
213 let trimmed = raw_path.trim_start_matches('/');
214 match (prefix.is_empty(), trimmed.is_empty()) {
215 (true, true) => String::new(),
216 (true, false) => trimmed.to_string(),
217 (false, true) => prefix.to_string(),
218 (false, false) => format!("{}/{}", prefix, trimmed),
219 }
220}
221
222fn format_adls_uri(container: &str, account: &str, path: &str) -> String {
223 if path.is_empty() {
224 format!("abfs://{}@{}.dfs.core.windows.net", container, account)
225 } else {
226 format!(
227 "abfs://{}@{}.dfs.core.windows.net/{}",
228 container, account, path
229 )
230 }
231}
232
233fn parse_s3_uri(value: &str) -> Option<(String, String)> {
234 let stripped = value.strip_prefix("s3://")?;
235 let mut parts = stripped.splitn(2, '/');
236 let bucket = parts.next()?.to_string();
237 if bucket.is_empty() {
238 return None;
239 }
240 let key = parts.next().unwrap_or("").to_string();
241 Some((bucket, key))
242}
243
244fn join_s3_key(prefix: &str, raw_path: &str) -> String {
245 let prefix = prefix.trim_matches('/');
246 let trimmed = raw_path.trim_start_matches('/');
247 match (prefix.is_empty(), trimmed.is_empty()) {
248 (true, true) => String::new(),
249 (true, false) => trimmed.to_string(),
250 (false, true) => prefix.to_string(),
251 (false, false) => format!("{}/{}", prefix, trimmed),
252 }
253}
254
255fn format_s3_uri(bucket: &str, key: &str) -> String {
256 if key.is_empty() {
257 format!("s3://{}", bucket)
258 } else {
259 format!("s3://{}/{}", bucket, key)
260 }
261}
262
263fn resolve_gcs_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
264 let bucket = definition.bucket.as_ref().ok_or_else(|| {
265 Box::new(ConfigError(format!(
266 "storage {} requires bucket for type gcs",
267 definition.name
268 )))
269 })?;
270 if let Some((bucket_in_path, key)) = parse_gcs_uri(raw_path) {
271 if bucket_in_path != *bucket {
272 return Err(Box::new(ConfigError(format!(
273 "storage {} bucket mismatch: {}",
274 definition.name, bucket_in_path
275 ))));
276 }
277 return Ok(format_gcs_uri(bucket, &key));
278 }
279
280 let key = join_gcs_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
281 Ok(format_gcs_uri(bucket, &key))
282}
283
284fn parse_gcs_uri(value: &str) -> Option<(String, String)> {
285 let stripped = value.strip_prefix("gs://")?;
286 let mut parts = stripped.splitn(2, '/');
287 let bucket = parts.next()?.to_string();
288 if bucket.is_empty() {
289 return None;
290 }
291 let key = parts.next().unwrap_or("").to_string();
292 Some((bucket, key))
293}
294
295fn join_gcs_key(prefix: &str, raw_path: &str) -> String {
296 let prefix = prefix.trim_matches('/');
297 let trimmed = raw_path.trim_start_matches('/');
298 match (prefix.is_empty(), trimmed.is_empty()) {
299 (true, true) => String::new(),
300 (true, false) => trimmed.to_string(),
301 (false, true) => prefix.to_string(),
302 (false, false) => format!("{}/{}", prefix, trimmed),
303 }
304}
305
306fn format_gcs_uri(bucket: &str, key: &str) -> String {
307 if key.is_empty() {
308 format!("gs://{}", bucket)
309 } else {
310 format!("gs://{}/{}", bucket, key)
311 }
312}