1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3
4use crate::config::{RootConfig, StorageDefinition};
5use crate::{ConfigError, FloeResult};
6
7#[derive(Debug, Clone)]
8pub struct ConfigBase {
9 local_dir: PathBuf,
10 remote_base: Option<RemoteConfigBase>,
11}
12
13impl ConfigBase {
14 pub fn local_from_path(path: &Path) -> Self {
15 let local_dir = path
16 .parent()
17 .unwrap_or_else(|| Path::new("."))
18 .to_path_buf();
19 Self {
20 local_dir,
21 remote_base: None,
22 }
23 }
24
25 pub fn remote_from_uri(local_dir: PathBuf, uri: &str) -> FloeResult<Self> {
26 Ok(Self {
27 local_dir,
28 remote_base: Some(RemoteConfigBase::from_uri(uri)?),
29 })
30 }
31
32 pub fn local_dir(&self) -> &Path {
33 &self.local_dir
34 }
35
36 pub fn remote_base(&self) -> Option<&RemoteConfigBase> {
37 self.remote_base.as_ref()
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum RemoteScheme {
43 S3,
44 Gcs,
45 Adls,
46}
47
48#[derive(Debug, Clone)]
49pub struct RemoteConfigBase {
50 scheme: RemoteScheme,
51 bucket: String,
52 account: Option<String>,
53 container: Option<String>,
54 prefix: String,
55}
56
57impl RemoteConfigBase {
58 fn from_uri(uri: &str) -> FloeResult<Self> {
59 if let Some((bucket, key)) = parse_s3_uri(uri) {
60 let prefix = parent_prefix(&key);
61 return Ok(Self {
62 scheme: RemoteScheme::S3,
63 bucket,
64 account: None,
65 container: None,
66 prefix,
67 });
68 }
69 if let Some((bucket, key)) = parse_gcs_uri(uri) {
70 let prefix = parent_prefix(&key);
71 return Ok(Self {
72 scheme: RemoteScheme::Gcs,
73 bucket,
74 account: None,
75 container: None,
76 prefix,
77 });
78 }
79 if let Some((container, account, path)) = parse_adls_uri(uri) {
80 let prefix = parent_prefix(&path);
81 return Ok(Self {
82 scheme: RemoteScheme::Adls,
83 bucket: String::new(),
84 account: Some(account),
85 container: Some(container),
86 prefix,
87 });
88 }
89 Err(Box::new(ConfigError(format!(
90 "unsupported config uri: {}",
91 uri
92 ))))
93 }
94
95 fn matches_storage(&self, storage_type: &str) -> bool {
96 matches!(
97 (self.scheme, storage_type),
98 (RemoteScheme::S3, "s3") | (RemoteScheme::Gcs, "gcs") | (RemoteScheme::Adls, "adls")
99 )
100 }
101
102 fn join(&self, relative: &str) -> String {
103 match self.scheme {
104 RemoteScheme::S3 => {
105 let key = join_s3_key(&self.prefix, relative);
106 format_s3_uri(&self.bucket, &key)
107 }
108 RemoteScheme::Gcs => {
109 let key = join_s3_key(&self.prefix, relative);
110 format_gcs_uri(&self.bucket, &key)
111 }
112 RemoteScheme::Adls => {
113 let combined = join_adls_path(&self.prefix, relative);
114 let container = self.container.as_ref().expect("container");
115 let account = self.account.as_ref().expect("account");
116 format_adls_uri(container, account, &combined)
117 }
118 }
119 }
120}
121
122#[derive(Debug, Clone)]
123pub struct ResolvedPath {
124 pub storage: String,
125 pub uri: String,
126 pub local_path: Option<PathBuf>,
127}
128
129pub struct StorageResolver {
130 config_base: ConfigBase,
131 default_name: String,
132 definitions: HashMap<String, StorageDefinition>,
133 has_config: bool,
134}
135
136impl StorageResolver {
137 pub fn new(config: &RootConfig, config_base: ConfigBase) -> FloeResult<Self> {
138 if let Some(storages) = &config.storages {
139 let mut definitions = HashMap::new();
140 for definition in &storages.definitions {
141 if definitions
142 .insert(definition.name.clone(), definition.clone())
143 .is_some()
144 {
145 return Err(Box::new(ConfigError(format!(
146 "storages.definitions name={} is duplicated",
147 definition.name
148 ))));
149 }
150 }
151 let default_name = storages
152 .default
153 .clone()
154 .ok_or_else(|| Box::new(ConfigError("storages.default is required".to_string())))?;
155 if !definitions.contains_key(&default_name) {
156 return Err(Box::new(ConfigError(format!(
157 "storages.default={} does not match any definition",
158 default_name
159 ))));
160 }
161 Ok(Self {
162 config_base,
163 default_name,
164 definitions,
165 has_config: true,
166 })
167 } else {
168 Ok(Self {
169 config_base,
170 default_name: "local".to_string(),
171 definitions: HashMap::new(),
172 has_config: false,
173 })
174 }
175 }
176
177 pub fn from_path(config: &RootConfig, config_path: &Path) -> FloeResult<Self> {
178 Self::new(config, ConfigBase::local_from_path(config_path))
179 }
180
181 pub fn resolve_path(
182 &self,
183 entity_name: &str,
184 field: &str,
185 storage_name: Option<&str>,
186 raw_path: &str,
187 ) -> FloeResult<ResolvedPath> {
188 let name = storage_name.unwrap_or(self.default_name.as_str());
189 if !self.has_config && name != "local" {
190 return Err(Box::new(ConfigError(format!(
191 "entity.name={} {field} references unknown storage {} (no storages block)",
192 entity_name, name
193 ))));
194 }
195
196 let definition = if self.has_config {
197 self.definitions.get(name).cloned().ok_or_else(|| {
198 Box::new(ConfigError(format!(
199 "entity.name={} {field} references unknown storage {}",
200 entity_name, name
201 )))
202 })?
203 } else {
204 StorageDefinition {
205 name: "local".to_string(),
206 fs_type: "local".to_string(),
207 bucket: None,
208 region: None,
209 account: None,
210 container: None,
211 prefix: None,
212 }
213 };
214
215 let resolved_remote = self
216 .resolve_remote_relative(&definition, raw_path)
217 .unwrap_or_else(|| raw_path.to_string());
218 let raw_path = resolved_remote.as_str();
219
220 match definition.fs_type.as_str() {
221 "local" => {
222 if is_remote_uri(raw_path) {
223 return Err(Box::new(ConfigError(format!(
224 "entity.name={} {field} must be a local path (got {})",
225 entity_name, raw_path
226 ))));
227 }
228 if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
229 return Err(Box::new(ConfigError(format!(
230 "entity.name={} {field} must be absolute when config is remote",
231 entity_name
232 ))));
233 }
234 let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
235 Ok(ResolvedPath {
236 storage: name.to_string(),
237 uri: local_uri(&resolved),
238 local_path: Some(resolved),
239 })
240 }
241 "s3" => {
242 let uri = resolve_s3_uri(&definition, raw_path)?;
243 Ok(ResolvedPath {
244 storage: name.to_string(),
245 uri,
246 local_path: None,
247 })
248 }
249 "adls" => {
250 let uri = resolve_adls_uri(&definition, raw_path)?;
251 Ok(ResolvedPath {
252 storage: name.to_string(),
253 uri,
254 local_path: None,
255 })
256 }
257 "gcs" => {
258 let uri = resolve_gcs_uri(&definition, raw_path)?;
259 Ok(ResolvedPath {
260 storage: name.to_string(),
261 uri,
262 local_path: None,
263 })
264 }
265 _ => Err(Box::new(ConfigError(format!(
266 "storage type {} is unsupported",
267 definition.fs_type
268 )))),
269 }
270 }
271
272 pub fn resolve_report_path(
273 &self,
274 storage_name: Option<&str>,
275 raw_path: &str,
276 ) -> FloeResult<ResolvedPath> {
277 let name = storage_name.unwrap_or(self.default_name.as_str());
278 if !self.has_config && name != "local" {
279 return Err(Box::new(ConfigError(format!(
280 "report.storage references unknown storage {} (no storages block)",
281 name
282 ))));
283 }
284
285 let definition = if self.has_config {
286 self.definitions.get(name).cloned().ok_or_else(|| {
287 Box::new(ConfigError(format!(
288 "report.storage references unknown storage {}",
289 name
290 )))
291 })?
292 } else {
293 StorageDefinition {
294 name: "local".to_string(),
295 fs_type: "local".to_string(),
296 bucket: None,
297 region: None,
298 account: None,
299 container: None,
300 prefix: None,
301 }
302 };
303
304 let resolved_remote = self
305 .resolve_remote_relative(&definition, raw_path)
306 .unwrap_or_else(|| raw_path.to_string());
307 let raw_path = resolved_remote.as_str();
308
309 match definition.fs_type.as_str() {
310 "local" => {
311 if is_remote_uri(raw_path) {
312 return Err(Box::new(ConfigError(format!(
313 "report.path must be a local path (got {})",
314 raw_path
315 ))));
316 }
317 if self.config_base.remote_base().is_some() && Path::new(raw_path).is_relative() {
318 return Err(Box::new(ConfigError(
319 "report.path must be absolute when config is remote".to_string(),
320 )));
321 }
322 let resolved = resolve_local_path(self.config_base.local_dir(), raw_path);
323 Ok(ResolvedPath {
324 storage: name.to_string(),
325 uri: local_uri(&resolved),
326 local_path: Some(resolved),
327 })
328 }
329 "s3" => {
330 let uri = resolve_s3_uri(&definition, raw_path)?;
331 Ok(ResolvedPath {
332 storage: name.to_string(),
333 uri,
334 local_path: None,
335 })
336 }
337 "adls" => {
338 let uri = resolve_adls_uri(&definition, raw_path)?;
339 Ok(ResolvedPath {
340 storage: name.to_string(),
341 uri,
342 local_path: None,
343 })
344 }
345 "gcs" => {
346 let uri = resolve_gcs_uri(&definition, raw_path)?;
347 Ok(ResolvedPath {
348 storage: name.to_string(),
349 uri,
350 local_path: None,
351 })
352 }
353 _ => Err(Box::new(ConfigError(format!(
354 "storage type {} is unsupported",
355 definition.fs_type
356 )))),
357 }
358 }
359
360 pub fn definition(&self, name: &str) -> Option<StorageDefinition> {
361 if self.has_config {
362 self.definitions.get(name).cloned()
363 } else if name == "local" {
364 Some(StorageDefinition {
365 name: "local".to_string(),
366 fs_type: "local".to_string(),
367 bucket: None,
368 region: None,
369 account: None,
370 container: None,
371 prefix: None,
372 })
373 } else {
374 None
375 }
376 }
377
378 pub fn default_storage_name(&self) -> &str {
379 self.default_name.as_str()
380 }
381
382 pub fn config_dir(&self) -> &Path {
383 self.config_base.local_dir()
384 }
385
386 fn resolve_remote_relative(
387 &self,
388 definition: &StorageDefinition,
389 raw_path: &str,
390 ) -> Option<String> {
391 if !is_relative_path(raw_path) {
392 return None;
393 }
394 if definition.prefix.is_some() {
395 return None;
396 }
397 let remote = self.config_base.remote_base()?;
398 if !remote.matches_storage(definition.fs_type.as_str()) {
399 return None;
400 }
401 Some(remote.join(raw_path))
402 }
403}
404
405pub fn resolve_local_path(config_dir: &Path, raw_path: &str) -> PathBuf {
406 let path = Path::new(raw_path);
407 let resolved = if path.is_absolute() {
408 path.to_path_buf()
409 } else {
410 config_dir.join(path)
411 };
412 crate::io::storage::paths::normalize_local_path(&resolved)
413}
414
415fn local_uri(path: &Path) -> String {
416 let normalized = crate::io::storage::paths::normalize_local_path(path);
417 format!("local://{}", normalized.display())
418}
419
420fn resolve_s3_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
421 let bucket = definition.bucket.as_ref().ok_or_else(|| {
422 Box::new(ConfigError(format!(
423 "storage {} requires bucket for type s3",
424 definition.name
425 )))
426 })?;
427 if let Some((bucket_in_path, key)) = parse_s3_uri(raw_path) {
428 if bucket_in_path != *bucket {
429 return Err(Box::new(ConfigError(format!(
430 "storage {} bucket mismatch: {}",
431 definition.name, bucket_in_path
432 ))));
433 }
434 return Ok(format_s3_uri(bucket, &key));
435 }
436
437 let key = join_s3_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
438 Ok(format_s3_uri(bucket, &key))
439}
440
441fn resolve_adls_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
442 let account = definition.account.as_ref().ok_or_else(|| {
443 Box::new(ConfigError(format!(
444 "storage {} requires account for type adls",
445 definition.name
446 )))
447 })?;
448 let container = definition.container.as_ref().ok_or_else(|| {
449 Box::new(ConfigError(format!(
450 "storage {} requires container for type adls",
451 definition.name
452 )))
453 })?;
454 if let Some((container_in_path, account_in_path, path)) = parse_adls_uri(raw_path) {
455 if container_in_path != *container || account_in_path != *account {
456 return Err(Box::new(ConfigError(format!(
457 "storage {} adls account/container mismatch",
458 definition.name
459 ))));
460 }
461 return Ok(format_adls_uri(container, account, &path));
462 }
463 let prefix = definition.prefix.as_deref().unwrap_or("");
464 let combined = join_adls_path(prefix, raw_path);
465 Ok(format_adls_uri(container, account, &combined))
466}
467
468fn join_adls_path(prefix: &str, raw_path: &str) -> String {
469 let prefix = prefix.trim_matches('/');
470 let trimmed = raw_path.trim_start_matches('/');
471 match (prefix.is_empty(), trimmed.is_empty()) {
472 (true, true) => String::new(),
473 (true, false) => trimmed.to_string(),
474 (false, true) => prefix.to_string(),
475 (false, false) => format!("{}/{}", prefix, trimmed),
476 }
477}
478
479fn format_adls_uri(container: &str, account: &str, path: &str) -> String {
480 if path.is_empty() {
481 format!("abfs://{}@{}.dfs.core.windows.net", container, account)
482 } else {
483 format!(
484 "abfs://{}@{}.dfs.core.windows.net/{}",
485 container, account, path
486 )
487 }
488}
489
490fn parse_s3_uri(value: &str) -> Option<(String, String)> {
491 let stripped = value.strip_prefix("s3://")?;
492 let mut parts = stripped.splitn(2, '/');
493 let bucket = parts.next()?.to_string();
494 if bucket.is_empty() {
495 return None;
496 }
497 let key = parts.next().unwrap_or("").to_string();
498 Some((bucket, key))
499}
500
501fn join_s3_key(prefix: &str, raw_path: &str) -> String {
502 let prefix = prefix.trim_matches('/');
503 let trimmed = raw_path.trim_start_matches('/');
504 match (prefix.is_empty(), trimmed.is_empty()) {
505 (true, true) => String::new(),
506 (true, false) => trimmed.to_string(),
507 (false, true) => prefix.to_string(),
508 (false, false) => format!("{}/{}", prefix, trimmed),
509 }
510}
511
512fn format_s3_uri(bucket: &str, key: &str) -> String {
513 if key.is_empty() {
514 format!("s3://{}", bucket)
515 } else {
516 format!("s3://{}/{}", bucket, key)
517 }
518}
519
520fn resolve_gcs_uri(definition: &StorageDefinition, raw_path: &str) -> FloeResult<String> {
521 let bucket = definition.bucket.as_ref().ok_or_else(|| {
522 Box::new(ConfigError(format!(
523 "storage {} requires bucket for type gcs",
524 definition.name
525 )))
526 })?;
527 if let Some((bucket_in_path, key)) = parse_gcs_uri(raw_path) {
528 if bucket_in_path != *bucket {
529 return Err(Box::new(ConfigError(format!(
530 "storage {} bucket mismatch: {}",
531 definition.name, bucket_in_path
532 ))));
533 }
534 return Ok(format_gcs_uri(bucket, &key));
535 }
536
537 let key = join_gcs_key(definition.prefix.as_deref().unwrap_or(""), raw_path);
538 Ok(format_gcs_uri(bucket, &key))
539}
540
541fn parse_gcs_uri(value: &str) -> Option<(String, String)> {
542 let stripped = value.strip_prefix("gs://")?;
543 let mut parts = stripped.splitn(2, '/');
544 let bucket = parts.next()?.to_string();
545 if bucket.is_empty() {
546 return None;
547 }
548 let key = parts.next().unwrap_or("").to_string();
549 Some((bucket, key))
550}
551
552fn join_gcs_key(prefix: &str, raw_path: &str) -> String {
553 let prefix = prefix.trim_matches('/');
554 let trimmed = raw_path.trim_start_matches('/');
555 match (prefix.is_empty(), trimmed.is_empty()) {
556 (true, true) => String::new(),
557 (true, false) => trimmed.to_string(),
558 (false, true) => prefix.to_string(),
559 (false, false) => format!("{}/{}", prefix, trimmed),
560 }
561}
562
563fn format_gcs_uri(bucket: &str, key: &str) -> String {
564 if key.is_empty() {
565 format!("gs://{}", bucket)
566 } else {
567 format!("gs://{}/{}", bucket, key)
568 }
569}
570
571fn parse_adls_uri(value: &str) -> Option<(String, String, String)> {
572 let stripped = value.strip_prefix("abfs://")?;
573 let (container, rest) = stripped.split_once('@')?;
574 let (account, path) = rest.split_once(".dfs.core.windows.net")?;
575 let path = path.trim_start_matches('/').to_string();
576 if container.is_empty() || account.is_empty() {
577 return None;
578 }
579 Some((container.to_string(), account.to_string(), path))
580}
581
582fn parent_prefix(key: &str) -> String {
583 match key.rsplit_once('/') {
584 Some((parent, _)) => parent.to_string(),
585 None => String::new(),
586 }
587}
588
589fn is_remote_uri(value: &str) -> bool {
590 value.starts_with("s3://") || value.starts_with("gs://") || value.starts_with("abfs://")
591}
592
593fn is_relative_path(value: &str) -> bool {
594 !is_remote_uri(value) && Path::new(value).is_relative()
595}